diff --git a/Cargo.lock b/Cargo.lock index beb2b9f..c0ce12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,8 @@ name = "billion_row_challenge" version = "1.0.0" dependencies = [ "assert_cmd", + "crossbeam-channel", + "rayon", ] [[package]] @@ -41,6 +43,40 @@ dependencies = [ "serde", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "difflib" version = "0.4.0" @@ -53,6 +89,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "either" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" + [[package]] name = "libc" version = "0.2.155" @@ -110,6 +152,26 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex-automata" version = "0.4.7" diff --git a/Cargo.toml b/Cargo.toml index 09b1b41..e41b98a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" rust-version = "1.79.0" [dependencies] +rayon = { version = "=1.10.0" } +crossbeam-channel = { version = "=0.5.13" } [dev-dependencies] -assert_cmd = "2.0.14" +assert_cmd = { version = "=2.0.14" } diff --git a/src/lib.rs b/src/lib.rs index 09bc69c..3a91531 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,68 @@ use config::Config; +use crossbeam_channel::{bounded, unbounded}; use error::RunError; -use std::fs; +use rayon::prelude::*; +use std::fs::File; +use std::io::{BufRead, BufReader}; use weather::WeatherStations; pub mod config; pub mod error; pub mod weather; +/// Configuration to control memory usage by ensuring that at most `CHUNK_COUNT` chunks of `CHUNK_SIZE` lines each are held in memory at any given time, while still allowing for parallel processing of the data. + +/// Number of lines to process in one chunk. +const CHUNK_SIZE: usize = 100_000; + +/// Number of chunks to process at the same time. +const CHUNK_COUNT: usize = 100; + pub fn run(config: &Config) -> Result<(), RunError> { - let file_content = fs::read_to_string(&config.input_file_path)?; - let mut lines: Vec = file_content.lines().map(|line| line.to_string()).collect(); - let mut weather_stations = WeatherStations::default(); - for line in lines.iter_mut() { - weather_stations.add_measurement(line); + let file = File::open(&config.input_file_path)?; + let reader = BufReader::new(file); + let (sender, receiver) = bounded::>(CHUNK_COUNT); + let (error_sender, error_receiver) = unbounded(); + std::thread::spawn(move || { + let mut chunk = Vec::with_capacity(CHUNK_SIZE); + for line in reader.lines() { + match line { + Ok(line) => { + chunk.push(line); + if chunk.len() >= CHUNK_SIZE { + if sender.send(chunk).is_err() { + return; + } + chunk = Vec::with_capacity(CHUNK_SIZE); + } + } + Err(error) => { + error_sender.send(error).unwrap(); + return; + } + } + } + if !chunk.is_empty() { + sender.send(chunk).unwrap(); + } + }); + if let Ok(error) = error_receiver.try_recv() { + return Err(RunError::InputOutputError(error)); } + let weather_stations: WeatherStations = receiver + .into_iter() + .par_bridge() + .map(|chunk| { + let mut local_weather_stations = WeatherStations::default(); + for line in chunk { + local_weather_stations.add_measurement(&line); + } + local_weather_stations + }) + .reduce(WeatherStations::default, |mut a, b| { + a.merge(b); + a + }); println!("{}", weather_stations.output()); Ok(()) } diff --git a/src/weather.rs b/src/weather.rs index 0c868a5..3fd1974 100644 --- a/src/weather.rs +++ b/src/weather.rs @@ -22,7 +22,6 @@ impl FromStr for WeatherStationMeasurement { /// inside [`Err`]. /// /// # Examples - /// /// ``` /// use std::str::FromStr; /// use billion_row_challenge::weather::WeatherStationMeasurement; @@ -79,6 +78,7 @@ impl WeatherStationMeasurements { /// let expected_output = "Bosaso=-15.0/10.0/20.3"; /// let actual_output = weather_station.output(); /// assert_eq!(actual_output, expected_output); + /// ``` pub fn output(&self) -> String { format!( "{}={:.1}/{:.1}/{:.1}", @@ -158,6 +158,7 @@ impl WeatherStations { /// let expected_output = "{Bosaso=-15.0/10.0/20.0, Petropavlovsk-Kamchatsky=-10.0/0.0/10.0}"; /// let actual_output = weather_stations.output(); /// assert_eq!(actual_output, expected_output); + /// ``` pub fn output(&self) -> String { let mut outputs: Vec = vec![]; let mut station_names: Vec<&String> = self.stations.keys().collect(); @@ -168,4 +169,27 @@ impl WeatherStations { } format!("{{{}}}", outputs.join(", ")) } + + pub fn merge(&mut self, other: WeatherStations) { + for weather_station_measurements in other.stations.values() { + match self.stations.get_mut(&weather_station_measurements.name) { + Some(weather_station_found) => { + weather_station_found.minimum = weather_station_found + .minimum + .min(weather_station_measurements.minimum); + weather_station_found.maximum = weather_station_found + .maximum + .max(weather_station_measurements.maximum); + weather_station_found.count += weather_station_measurements.count; + weather_station_found.sum += weather_station_measurements.sum; + } + None => { + self.stations.insert( + weather_station_measurements.name.clone(), + weather_station_measurements.clone(), + ); + } + } + } + } }