Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel letter frequency example in tasks #1

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ gleam_stdlib = "~> 0.34 or ~> 1.0"
gleam_erlang = "~> 0.24"
gleam_otp = "~> 0.9"
prng = "~> 3.0"
simplifile = ">= 1.7.0 and < 2.0.0"
birl = ">= 1.5.0 and < 2.0.0"

[dev-dependencies]
gleeunit = "~> 1.0"
10 changes: 8 additions & 2 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
# You typically do not need to edit this file

packages = [
{ name = "birl", version = "1.5.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "ranger"], otp_app = "birl", source = "hex", outer_checksum = "23BFE5AB0D7D9E4ECC5BB89B7ABDDF8E976D98C65D2E173D116E6AAFBF24E633" },
{ name = "filepath", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "EFB6FF65C98B2A16378ABC3EE2B14124168C0CE5201553DE652E2644DCFDB594" },
{ name = "gleam_bitwise", version = "1.3.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_bitwise", source = "hex", outer_checksum = "B36E1D3188D7F594C7FD4F43D0D2CE17561DE896202017548578B16FE1FE9EFC" },
{ name = "gleam_erlang", version = "0.24.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "26BDB52E61889F56A291CB34167315780EE4AA20961917314446542C90D1C1A0" },
{ name = "gleam_otp", version = "0.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "gleam_erlang"], otp_app = "gleam_otp", source = "hex", outer_checksum = "5FADBBEC5ECF3F8B6BE91101D432758503192AE2ADBAD5602158977341489F71" },
{ name = "gleam_otp", version = "0.9.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "5FADBBEC5ECF3F8B6BE91101D432758503192AE2ADBAD5602158977341489F71" },
{ name = "gleam_stdlib", version = "0.35.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "5443EEB74708454B65650FEBBB1EF5175057D1DEC62AEA9D7C6D96F41DA79152" },
{ name = "gleeunit", version = "1.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D364C87AFEB26BDB4FB8A5ABDE67D635DC9FA52D6AB68416044C35B096C6882D" },
{ name = "prng", version = "3.0.1", build_tools = ["gleam"], requirements = ["gleam_bitwise", "gleam_stdlib"], otp_app = "prng", source = "hex", outer_checksum = "C78A80DE41469A0BB1AB3B0B0610CCE5DB70C5659A540E2E0E6C54FA38134290" },
{ name = "ranger", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "ranger", source = "hex", outer_checksum = "28E615AE7590ED922AF1510DDF606A2ECBBC2A9609AF36D412EDC925F06DFD20" },
{ name = "simplifile", version = "1.7.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "1D5DFA3A2F9319EC85825F6ED88B8E449F381B0D55A62F5E61424E748E7DDEB0" },
]

[requirements]
birl = { version = ">= 1.5.0 and < 2.0.0"}
gleam_erlang = { version = "~> 0.24" }
gleam_otp = { version = "~> 0.9" }
gleam_stdlib = { version = "~> 0.34 or ~> 1.0" }
gleeunit = { version = "~> 1.0" }
prng = { version = "~> 3.0"}
prng = { version = "~> 3.0" }
simplifile = { version = ">= 1.7.0 and < 2.0.0" }
2 changes: 1 addition & 1 deletion src/actors/pantry.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
////
////

import gleam/otp/actor
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
import gleam/set.{type Set}

// Below this comment are the public functions that we want to expose to other modules.
Expand Down
4 changes: 2 additions & 2 deletions src/concurrency_primitives.gleam
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import gleam/io
import gleam/erlang/process.{type Subject}
import gleam/string
import gleam/function
import gleam/int
import gleam/io
import gleam/string

pub fn main() {
// A "process" in gleam is a lightweight, concurrent unit of execution.
Expand Down
2 changes: 1 addition & 1 deletion src/supervisors.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
////
//// Back? Great, read on.

import gleam/erlang/process.{type Subject}
import gleam/io
import gleam/otp/supervisor
import gleam/erlang/process.{type Subject}
import supervisors/a_shit_actor as duckduckgoose

pub fn main() {
Expand Down
2 changes: 1 addition & 1 deletion src/supervisors/a_shit_actor.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
//// around the circle. Interestingly in the midwest of the United States the
//// game is often called "Duck, Duck, Grey Duck".)

import gleam/otp/actor
import gleam/erlang/process.{type Subject}
import gleam/function
import gleam/otp/actor
import prng/random

/// Okay, well this is new.
Expand Down
93 changes: 90 additions & 3 deletions src/tasks.gleam
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
//// Tasks are one off processes meant to easily make synchronous work async.
//// They're really straightforward to use, just fire them off and check back later.

import gleam/io
import gleam/otp/task
import gleam/dict.{type Dict}
import gleam/erlang
import gleam/erlang/process
import gleam/list
import gleam/int
import gleam/io
import gleam/list
import gleam/option.{None, Some}
import gleam/otp/task
import gleam/result
import gleam/string
import simplifile
import birl
import birl/duration

pub fn main() {
// Do a thing in a different process
Expand Down Expand Up @@ -78,4 +85,84 @@ pub fn main() {
Ok(val) -> io.println("The 100th item is" <> int.to_string(val))
Error(Nil) -> io.println_error("The list has fewer than 100 items")
}

// Alright, let's do a concurrency hello world example: Parallel Letter Frequency.
// We'll write a function that takes a list of codepoints and counts the frequency each one
// appears. We use a list of codepoints instead of a string because dealing with graphemes
// properly just distracts from the point of this exercise.

use workload <- result.try(simplifile.read("./src/tasks/king_james_bible.txt"))
let workload = string.to_utf_codepoints(workload)

// Doing work concurrently is about finding work that can be split into repeatable
// chunks. Therefore when trying to split work into parts, it's usually a good idea
// to start with the simple linear version then try to reuse it :)

let linear_freq = time("linear frequency", fn() { linear_letter_frequency(workload) })

// Okay, now that that's working, let's split the work into appropriate chunks
// and do the chunks in separate tasks

let parallel_freq = time("parallel frequency", fn() { parallel_letter_frequency(workload, 200_000) })

// Little sanity check
case linear_freq == parallel_freq {
True -> io.println("Our parallel and linear frequency functions produced the same output")
False -> io.println("Our parallel and linear frequency functions produced different output")
}

// Returning an OK because we used result.try
Ok(Nil)
}

// This is our base linear implementation for comparison. Hopefully it makes sense.
// We fold over the list and for each codepoint we increment it's value in the list.
fn linear_letter_frequency(input: List(UtfCodepoint)) -> Dict(UtfCodepoint, Int) {
use acc, letter <- list.fold(input, dict.new())
use entry <- dict.update(acc, update: letter)
case entry {
Some(n) -> n + 1
None -> 1
}
}

// This is our parallel/concurrent implementation
// (If your computer has multiple cores, Erlang should automagically use them and make
// this properly parallel)
fn parallel_letter_frequency(
input: List(UtfCodepoint),
chunk_size: Int,
) -> Dict(UtfCodepoint, Int) {

// Create chunks of work and pass them to separate tasks to be worked on
let handles = list.map(list.sized_chunk(input, chunk_size), fn(chunk) {
task.async(fn() { linear_letter_frequency(chunk) })
})

// Fold over the handles to the tasks to await their results
use total_freq, partial_freq_handle <- list.fold(handles, dict.new())
let partial_freq = task.await(partial_freq_handle, 1000)

// Merge the results into a single structure as they come in.
// We fold over the partial mapping we got back from the task
// and update the total count with it.
//
// Notice we do this inside the fold of the tasks. We don't want to
// await the next task until we're out of work to do.
use total_freq, letter, count <- dict.fold(partial_freq, total_freq)
use entry <- dict.update(total_freq, letter)
case entry {
Some(old_count) -> old_count + count
None -> count
}
}

// This is just a little timer function to help use see the results of our work.
fn time(name: String, f: fn() -> a) -> a {
let start = birl.now()
let x = f()
let end = birl.now()
let difference = birl.difference(end, start) |> duration.blur_to(duration.MilliSecond)
io.println(name <> " took: " <> int.to_string(difference) <> "ms")
x
}
Loading
Loading