diff --git a/src/gleam/erlang/process.gleam b/src/gleam/erlang/process.gleam index 7172233..68ae7aa 100644 --- a/src/gleam/erlang/process.gleam +++ b/src/gleam/erlang/process.gleam @@ -239,18 +239,21 @@ pub fn send(subject: Subject(message), message: message) -> Nil { /// This function will panic if a process tries to receive with a non-named /// subject that it does not own. /// +/// of note, this function will error even if there are queued up messages +/// within the process' mailbox directed at a named subject that was +/// *previously* registered to this process, and then got re-registered +/// to another. +/// To recieve messages on subjects not directed at the current pid, +/// use `Select` +/// pub fn receive( from subject: Subject(message), within timeout: Int, ) -> Result(message, Nil) { - case subject { - NamedSubject(..) -> perform_receive(subject, timeout) - Subject(owner:, ..) -> - case owner == self() { - True -> perform_receive(subject, timeout) - False -> - panic as "Cannot receive with a subject owned by another process" - } + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive(subject, timeout) + _ -> panic as "Cannot receive with a subject owned by another process" } } @@ -263,8 +266,16 @@ fn perform_receive( /// Receive a message that has been sent to current process using the `Subject`. /// /// Same as `receive` but waits forever and returns the message as is. +pub fn receive_forever(from subject: Subject(message)) -> message { + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive_forever(subject) + _ -> panic as "Cannot receive with a subject owned by another process" + } +} + @external(erlang, "gleam_erlang_ffi", "receive") -pub fn receive_forever(from subject: Subject(message)) -> message +fn perform_receive_forever(from subject: Subject(message)) -> message /// A type that enables a process to wait for messages from multiple `Subject`s /// at the same time, returning whichever message arrives first. diff --git a/test/gleam/erlang/process_test.gleam b/test/gleam/erlang/process_test.gleam index 1a86c2b..1aacc5d 100644 --- a/test/gleam/erlang/process_test.gleam +++ b/test/gleam/erlang/process_test.gleam @@ -112,6 +112,15 @@ pub fn receive_forever_test() { let assert 2 = process.receive_forever(subject) } +pub fn receive_forever_other_test() { + let subject = process.new_subject() + process.spawn(fn() { process.send(subject, process.new_subject()) }) + let subject = process.receive_forever(subject) + + assert assert_panic(fn() { process.receive_forever(subject) }) + == "Cannot receive with a subject owned by another process" +} + pub fn is_alive_test() { let pid = process.spawn_unlinked(fn() { Nil }) process.sleep(5) @@ -666,3 +675,69 @@ pub fn name_test() { let assert Ok("Hello") = process.receive(subject, 0) process.unregister(name) } + +pub fn name_other_test() { + let name = process.new_name("name") + let pid = process.spawn_unlinked(fn() { process.sleep(10) }) + let assert Ok(_) = process.register(pid, name) + let subject = process.named_subject(name) + process.send(subject, "Hello") + assert assert_panic(fn() { process.receive(subject, 5) }) + == "Cannot receive with a subject owned by another process" + process.unregister(name) +} + +// tests that sending/receiving on names fail if the name gets unregistered +pub fn unregister_receive_test() { + let name = process.new_name("name") + let assert Ok(_) = process.register(process.self(), name) + let subject = process.named_subject(name) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let assert Ok(_) = process.unregister(name) + + assert assert_panic(fn() { process.send(subject, "Hello") }) + == "Sending to unregistered name" + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" + // this error message could be better +} + +// tests that receiving on names fail if the registration gets moved during execution +pub fn name_other_switchover_test() { + let name = process.new_name("name") + let self = process.self() + let assert Ok(_) = process.register(self, name) + let subject = process.named_subject(name) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let pid = + process.spawn(fn() { + process.sleep(10) + let assert Ok(_) = process.unregister(name) + // "undesirable behaviour" + let assert Ok(_) = process.register(process.self(), name) + + process.sleep(20) + let assert Ok("Peace") = process.receive(subject, 0) + }) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "World") + let assert Ok("World") = process.receive(subject, 0) + + assert Ok(self) == process.subject_owner(subject) + process.sleep(12) + assert Ok(pid) == process.subject_owner(subject) + + process.send(subject, "Peace") + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" +}