Compare commits

...

11 Commits

Author SHA1 Message Date
Andrew J. Stone 2992284f37 Bump version to 0.3.0 2018-02-27 01:51:14 -05:00
Andrew J. Stone 968cc4204b
Merge pull request #8 from andrewjstone/debug-format
Implement Debug for TimerHeap<T>
2018-02-27 01:48:11 -05:00
Andrew J. Stone 5e45b94a6a Implement Debug for TimerHeap<T>
Use a separate type (DebugEntry) for formatting TimerEntrys during
debug formatting of TimerHeap so that we can still see the other fields when
debug formatting TimerEntrys outside of formatting a TimerHeap.

Fixes #5
2018-02-27 01:37:30 -05:00
Andrew J. Stone 1a00a785eb
Merge pull request #7 from jonhoo/fine-remaining
Expose fine-grained time remaining
2018-02-26 01:23:51 -05:00
Jon Gjengset 4b3316a19e
Expose fine-grained time remaining
Fixes #6
2018-02-22 13:35:32 -05:00
Andrew J. Stone 5afae607ab Bump version to 0.2.0 2017-12-10 14:38:47 -05:00
Andrew J. Stone f3e973070d Add upsert method 2017-12-10 14:30:05 -05:00
Andrew J. Stone 09689c5700
Merge pull request #4 from andrewjstone/expired-iterator
Return an Iterator from expired method
2017-12-09 15:33:12 -05:00
Andrew J. Stone f375216d39 Return an Iterator from expired method
Return an `Expired` struct that implements Iterator from `expired()`
instead of a Vec to avoid an allocation.
2017-12-09 01:17:51 -05:00
Andrew J. Stone 85e9d0dd9e
Merge pull request #1 from jonhoo/patch-1
Add repository link
2017-12-08 22:58:11 -05:00
Jon Gjengset f48b5b63b8
Add repository link
This will cause a link to the GitHub project to appear on crates.io and docs.rs.
2017-12-04 14:29:20 -05:00
2 changed files with 160 additions and 73 deletions

View File

@ -1,8 +1,9 @@
[package]
name = "timer_heap"
version = "0.1.1"
version = "0.3.0"
authors = ["Andrew J. Stone <andrew.j.stone.1@gmail.com>"]
description = "A binary heap based timer management system"
repository = "https://github.com/andrewjstone/timer_heap"
keywords = ["timer", "heap"]
license = "Apache-2.0"

View File

@ -30,6 +30,8 @@ use std::collections::{BinaryHeap, HashMap};
use std::cmp::{Ordering, Ord, PartialOrd, PartialEq};
use std::time::{Instant, Duration};
use std::hash::Hash;
use std::fmt::{self, Debug};
use std::convert::From;
#[derive(Debug)]
pub enum Error {
@ -46,6 +48,42 @@ pub enum TimerType {
Recurring
}
/// An Iterator over expired timers
pub struct Expired<'a, T> where T: 'a {
now: Instant,
heap: &'a mut TimerHeap<T>
}
impl<'a, T> Iterator for Expired<'a, T> where T: Eq + Clone + Hash {
type Item = T;
fn next(&mut self) -> Option<T> {
while let Some(mut popped) = self.heap.timers.pop() {
if popped.expires_at <= self.now {
if self.heap.active.get(&popped.key) != Some(&popped.counter) {
// Drop an old deleted timer
continue;
}
if popped.recurring {
let key = popped.key.clone();
// We use the expires_at time so we don't keep skewing later and later
// by adding the duration to the current time.
popped.expires_at += popped.duration;
self.heap.timers.push(popped);
return Some(key);
} else {
let _ = self.heap.active.remove(&popped.key);
return Some(popped.key);
}
} else {
self.heap.timers.push(popped);
return None;
}
}
None
}
}
/// Store timers in a binary heap. Keep them sorted by which timer is going to expire first.
pub struct TimerHeap<T> {
timers: BinaryHeap<TimerEntry<T>>,
@ -62,6 +100,17 @@ pub struct TimerHeap<T> {
counter: u64
}
impl<T:Debug + Eq + Clone + Hash + Ord> Debug for TimerHeap<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_map()
.entries(self.timers
.iter()
.filter(|e| self.is_active(e))
.map(|e| (&e.key, DebugEntry::from(e))))
.finish()
}
}
impl<T: Eq + Clone + Hash> TimerHeap<T> {
/// Create a new TimerHeap
pub fn new() -> TimerHeap<T> {
@ -77,7 +126,9 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
self.timers.len()
}
/// Insert a TimerEntry into the heap
/// Insert a timer into the heap
///
/// Return an error if the key already exists.
pub fn insert(&mut self, key: T, duration: Duration, ty: TimerType) -> Result<(), Error> {
self._insert(key, duration, ty, Instant::now())
}
@ -94,6 +145,17 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
Ok(())
}
/// Insert a timer into the heap, replacing any existing timer if one exists
///
/// Return true if a timer already existed in the heap, false otherwise
pub fn upsert(&mut self, key: T, duration: Duration, ty: TimerType) -> bool {
let entry = TimerEntry::new(key.clone(), duration, ty, Instant::now(), self.counter);
self.timers.push(entry);
let existed = self.active.insert(key, self.counter).is_some();
self.counter += 1;
existed
}
/// Remove a TimerEnry by Id
///
/// Return true if it exists, false otherwise
@ -101,80 +163,57 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
self.active.remove(&key).is_some()
}
/// Return the amount of time remaining (in ms) for the earliest expiring timer
/// Return `None` if there are no timers in the heap
pub fn time_remaining(&self) -> Option<u64> {
/// Return the amount of time remaining for the earliest expiring timer.
/// Return `None` if there are no timers in the heap.
pub fn time_remaining(&self) -> Option<Duration> {
self._time_remaining(Instant::now())
}
/// A deterministically testable version of `time_remaining()`
fn _time_remaining(&self, now: Instant) -> Option<u64> {
self.timers.iter().find(|e| {
self.active.get(&e.key) == Some(&e.counter)
}).map(|e| {
if now > e.expires_at {
return 0;
}
let duration = e.expires_at - now;
// We add a millisecond if there is a fractional ms milliseconds in
// duration.subsec_nanos() / 1000000 so that we never fire early.
let nanos = duration.subsec_nanos() as u64;
// TODO: This can almost certainly be done faster
let subsec_ms = nanos / 1000000;
let mut remaining = duration.as_secs()*1000 + subsec_ms;
if subsec_ms * 1000000 < nanos {
remaining += 1;
}
remaining
})
fn _time_remaining(&self, now: Instant) -> Option<Duration> {
self.timers
.iter()
.find(|e| self.is_active(e))
.map(|e| {
if now > e.expires_at {
return Duration::new(0, 0);
}
e.expires_at - now
})
}
/// Return the earliest timeout based on a user timeout and the least remaining time in the
/// next timer to fire.
pub fn earliest_timeout(&self, user_timeout_ms: usize) -> usize {
pub fn earliest_timeout(&self, user_timeout: Duration) -> Duration {
if let Some(remaining) = self.time_remaining() {
if user_timeout_ms < remaining as usize {
user_timeout_ms
if user_timeout < remaining {
user_timeout
} else {
remaining as usize
remaining
}
} else {
user_timeout_ms
user_timeout
}
}
/// Return all expired keys
///
/// Any recurring timers will be re-added to the heap in the correct spot
pub fn expired(&mut self) -> Vec<T> {
pub fn expired(&mut self) -> Expired<T> {
self._expired(Instant::now())
}
/// A deterministically testable version of `expired()`
fn _expired(&mut self, now: Instant) -> Vec<T> {
let mut expired = Vec::new();
while let Some(mut popped) = self.timers.pop() {
if popped.expires_at <= now {
if self.active.get(&popped.key) != Some(&popped.counter) {
// Drop an old deleted timer
continue;
}
if popped.recurring {
expired.push(popped.key.clone());
// We use the expired_at time so we don't keep skewing later and later
// by adding the duration to the current time.
popped.expires_at += popped.duration;
self.timers.push(popped);
} else {
let _ = self.active.remove(&popped.key);
expired.push(popped.key)
}
} else {
self.timers.push(popped);
return expired;
}
fn _expired(&mut self, now: Instant) -> Expired<T> {
Expired {
now: now,
heap: self
}
expired
}
/// Is a given entry still active ?
fn is_active(&self, entry: &TimerEntry<T>) -> bool {
self.active.get(&entry.key) == Some(&entry.counter)
}
}
@ -187,6 +226,33 @@ struct TimerEntry<T> {
counter: u64
}
/// A timer entry used only when debug formatting a TimerHeap
struct DebugEntry {
recurring: bool,
expires_at: Instant,
duration: Duration,
}
impl Debug for DebugEntry {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Timer")
.field("recurring", &self.recurring)
.field("expires_at", &self.expires_at)
.field("duration", &self.duration)
.finish()
}
}
impl<'a, T> From<&'a TimerEntry<T>> for DebugEntry {
fn from(e: &TimerEntry<T>) -> Self {
DebugEntry {
recurring: e.recurring,
expires_at: e.expires_at,
duration: e.duration
}
}
}
impl<T> TimerEntry<T> {
pub fn new(key: T,
duration: Duration,
@ -237,19 +303,29 @@ mod tests {
use super::{TimerHeap, TimerType, Error};
use std::time::{Instant, Duration};
// Run this test with `cargo test -- --nocapture` to see the debug output
#[test]
fn time_remaining() {
let mut heap = TimerHeap::new();
let now = Instant::now();
let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
assert_matches!(heap._time_remaining(now), Some(500));
assert_matches!(heap._time_remaining(now + duration), Some(0));
assert_matches!(heap._time_remaining(now + duration + Duration::from_millis(100)),
Some(0));
heap._insert(1u64, duration, TimerType::Oneshot, now)
.unwrap();
println!("Active Oneshot Timer: {:?}", heap);
assert_eq!(heap._time_remaining(now), Some(Duration::from_millis(500)));
assert_eq!(
heap._time_remaining(now + duration),
Some(Duration::new(0, 0))
);
println!("Expired Oneshot Timer: {:?}", heap);
assert_eq!(
heap._time_remaining(now + duration + Duration::from_millis(100)),
Some(Duration::new(0, 0))
);
assert_eq!(heap.remove(2), false);
assert!(heap.remove(1));
assert_matches!(heap._time_remaining(now), None);
println!("Empty heap: {:?}", heap);
assert_eq!(heap._time_remaining(now), None);
}
#[test]
@ -258,12 +334,12 @@ mod tests {
let now = Instant::now();
let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
assert_eq!(heap._expired(now), vec![]);
let v = heap._expired(now + duration);
assert_eq!(heap._expired(now).count(), 0);
let count = heap._expired(now + duration).count();
assert_eq!(heap.active.len(), 0);
assert_eq!(v.len(), 1);
assert_eq!(count, 1);
assert_eq!(heap.len(), 0);
assert_eq!(heap._expired(now + duration), vec![]);
assert_eq!(heap._expired(now + duration).next(), None);
}
#[test]
@ -272,15 +348,15 @@ mod tests {
let now = Instant::now();
let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
assert_eq!(heap._expired(now), vec![]);
let v = heap._expired(now + duration);
assert_eq!(v.len(), 1);
assert_eq!(heap._expired(now).count(), 0);
let count = heap._expired(now + duration).count();
assert_eq!(count, 1);
assert_eq!(heap.len(), 1);
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)), vec![]);
let v = heap._expired(now + duration + duration);
assert_eq!(v.len(), 1);
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)).count(), 0);
let count = heap._expired(now + duration + duration).count();
assert_eq!(count, 1);
assert_eq!(heap.len(), 1);
assert_eq!(heap._expired(now + duration + duration), vec![]);
assert_eq!(heap._expired(now + duration + duration).count(), 0);
}
#[test]
@ -298,7 +374,7 @@ mod tests {
let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
assert_eq!(heap.remove(1u64), true);
assert_eq!(heap._expired(now + duration), vec![]);
assert_eq!(heap._expired(now + duration).count(), 0);
assert_eq!(heap.len(), 0);
}
@ -310,9 +386,19 @@ mod tests {
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
assert_eq!(heap.remove(1u64), true);
heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap();
let v = heap._expired(now + duration + duration);
assert_eq!(v.len(), 1);
assert_eq!(heap._expired(now + duration + duration).count(), 1);
assert_eq!(heap.active.len(), 0);
assert_eq!(heap.len(), 0);
}
#[test]
fn upsert() {
let mut heap = TimerHeap::new();
let duration = Duration::from_millis(500);
heap.insert(1u64, duration, TimerType::Oneshot).unwrap();
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
assert_eq!(heap.remove(1u64), true);
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), false);
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
}
}