1
1
mirror of https://github.com/theoludwig/billion_row_challenge.git synced 2024-12-08 00:45:46 +01:00

perf: parallel execution

This commit is contained in:
Théo LUDWIG 2024-06-25 00:02:31 +02:00
parent a6c30c1785
commit ce432af7b8
Signed by: theoludwig
GPG Key ID: ADFE5A563D718F3B
4 changed files with 145 additions and 8 deletions

62
Cargo.lock generated
View File

@ -28,6 +28,8 @@ name = "billion_row_challenge"
version = "1.0.0" version = "1.0.0"
dependencies = [ dependencies = [
"assert_cmd", "assert_cmd",
"crossbeam-channel",
"rayon",
] ]
[[package]] [[package]]
@ -41,6 +43,40 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]] [[package]]
name = "difflib" name = "difflib"
version = "0.4.0" version = "0.4.0"
@ -53,6 +89,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "either"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.155" version = "0.2.155"
@ -110,6 +152,26 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]] [[package]]
name = "regex-automata" name = "regex-automata"
version = "0.4.7" version = "0.4.7"

View File

@ -5,6 +5,8 @@ edition = "2021"
rust-version = "1.79.0" rust-version = "1.79.0"
[dependencies] [dependencies]
rayon = { version = "=1.10.0" }
crossbeam-channel = { version = "=0.5.13" }
[dev-dependencies] [dev-dependencies]
assert_cmd = "2.0.14" assert_cmd = { version = "=2.0.14" }

View File

@ -1,19 +1,68 @@
use config::Config; use config::Config;
use crossbeam_channel::{bounded, unbounded};
use error::RunError; use error::RunError;
use std::fs; use rayon::prelude::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use weather::WeatherStations; use weather::WeatherStations;
pub mod config; pub mod config;
pub mod error; pub mod error;
pub mod weather; pub mod weather;
/// Configuration to control memory usage by ensuring that at most `CHUNK_COUNT` chunks of `CHUNK_SIZE` lines each are held in memory at any given time, while still allowing for parallel processing of the data.
/// Number of lines to process in one chunk.
const CHUNK_SIZE: usize = 100_000;
/// Number of chunks to process at the same time.
const CHUNK_COUNT: usize = 100;
pub fn run(config: &Config) -> Result<(), RunError> { pub fn run(config: &Config) -> Result<(), RunError> {
let file_content = fs::read_to_string(&config.input_file_path)?; let file = File::open(&config.input_file_path)?;
let mut lines: Vec<String> = file_content.lines().map(|line| line.to_string()).collect(); let reader = BufReader::new(file);
let mut weather_stations = WeatherStations::default(); let (sender, receiver) = bounded::<Vec<String>>(CHUNK_COUNT);
for line in lines.iter_mut() { let (error_sender, error_receiver) = unbounded();
weather_stations.add_measurement(line); std::thread::spawn(move || {
let mut chunk = Vec::with_capacity(CHUNK_SIZE);
for line in reader.lines() {
match line {
Ok(line) => {
chunk.push(line);
if chunk.len() >= CHUNK_SIZE {
if sender.send(chunk).is_err() {
return;
}
chunk = Vec::with_capacity(CHUNK_SIZE);
}
}
Err(error) => {
error_sender.send(error).unwrap();
return;
}
}
}
if !chunk.is_empty() {
sender.send(chunk).unwrap();
}
});
if let Ok(error) = error_receiver.try_recv() {
return Err(RunError::InputOutputError(error));
} }
let weather_stations: WeatherStations = receiver
.into_iter()
.par_bridge()
.map(|chunk| {
let mut local_weather_stations = WeatherStations::default();
for line in chunk {
local_weather_stations.add_measurement(&line);
}
local_weather_stations
})
.reduce(WeatherStations::default, |mut a, b| {
a.merge(b);
a
});
println!("{}", weather_stations.output()); println!("{}", weather_stations.output());
Ok(()) Ok(())
} }

View File

@ -22,7 +22,6 @@ impl FromStr for WeatherStationMeasurement {
/// inside [`Err`]. /// inside [`Err`].
/// ///
/// # Examples /// # Examples
///
/// ``` /// ```
/// use std::str::FromStr; /// use std::str::FromStr;
/// use billion_row_challenge::weather::WeatherStationMeasurement; /// use billion_row_challenge::weather::WeatherStationMeasurement;
@ -79,6 +78,7 @@ impl WeatherStationMeasurements {
/// let expected_output = "Bosaso=-15.0/10.0/20.3"; /// let expected_output = "Bosaso=-15.0/10.0/20.3";
/// let actual_output = weather_station.output(); /// let actual_output = weather_station.output();
/// assert_eq!(actual_output, expected_output); /// assert_eq!(actual_output, expected_output);
/// ```
pub fn output(&self) -> String { pub fn output(&self) -> String {
format!( format!(
"{}={:.1}/{:.1}/{:.1}", "{}={:.1}/{:.1}/{:.1}",
@ -158,6 +158,7 @@ impl WeatherStations {
/// let expected_output = "{Bosaso=-15.0/10.0/20.0, Petropavlovsk-Kamchatsky=-10.0/0.0/10.0}"; /// let expected_output = "{Bosaso=-15.0/10.0/20.0, Petropavlovsk-Kamchatsky=-10.0/0.0/10.0}";
/// let actual_output = weather_stations.output(); /// let actual_output = weather_stations.output();
/// assert_eq!(actual_output, expected_output); /// assert_eq!(actual_output, expected_output);
/// ```
pub fn output(&self) -> String { pub fn output(&self) -> String {
let mut outputs: Vec<String> = vec![]; let mut outputs: Vec<String> = vec![];
let mut station_names: Vec<&String> = self.stations.keys().collect(); let mut station_names: Vec<&String> = self.stations.keys().collect();
@ -168,4 +169,27 @@ impl WeatherStations {
} }
format!("{{{}}}", outputs.join(", ")) format!("{{{}}}", outputs.join(", "))
} }
pub fn merge(&mut self, other: WeatherStations) {
for weather_station_measurements in other.stations.values() {
match self.stations.get_mut(&weather_station_measurements.name) {
Some(weather_station_found) => {
weather_station_found.minimum = weather_station_found
.minimum
.min(weather_station_measurements.minimum);
weather_station_found.maximum = weather_station_found
.maximum
.max(weather_station_measurements.maximum);
weather_station_found.count += weather_station_measurements.count;
weather_station_found.sum += weather_station_measurements.sum;
}
None => {
self.stations.insert(
weather_station_measurements.name.clone(),
weather_station_measurements.clone(),
);
}
}
}
}
} }