From 6c8ee417c15c4e9405e575675549c111e5385b5a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Christian=20L=C3=BCck?= <christian@lueck.tv>
Date: Mon, 6 Nov 2017 18:57:52 +0100
Subject: [PATCH] Explicit close of unwrapped stream should not emit error
 event

---
 README.md                    | 26 ++++++++++++++++++++++++++
 src/UnwrapReadableStream.php |  8 +++++---
 src/UnwrapWritableStream.php |  8 +++++---
 tests/UnwrapReadableTest.php | 16 ++++++++++++++++
 tests/UnwrapWritableTest.php | 16 ++++++++++++++++
 5 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/README.md b/README.md
index 770fc47..7647c22 100644
--- a/README.md
+++ b/README.md
@@ -167,6 +167,19 @@ If the given promise is already settled and does not resolve with an
 instance of `ReadableStreamInterface`, then you will not be able to receive
 the `error` event.
 
+You can `close()` the resulting stream at any time, which will either try to
+`cancel()` the pending promise or try to `close()` the underlying stream.
+
+```php
+$promise = startDownloadStream($uri);
+
+$stream = Stream\unwrapReadable($promise);
+
+$loop->addTimer(2.0, function () use ($stream) {
+    $stream->close();
+});
+```
+
 ### unwrapWritable()
 
 The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap
@@ -211,6 +224,19 @@ If the given promise is already settled and does not resolve with an
 instance of `WritableStreamInterface`, then you will not be able to receive
 the `error` event.
 
+You can `close()` the resulting stream at any time, which will either try to
+`cancel()` the pending promise or try to `close()` the underlying stream.
+
+```php
+$promise = startUploadStream($uri);
+
+$stream = Stream\unwrapWritable($promise);
+
+$loop->addTimer(2.0, function () use ($stream) {
+    $stream->close();
+});
+```
+
 ## Install
 
 The recommended way to install this library is [through Composer](https://getcomposer.org).
diff --git a/src/UnwrapReadableStream.php b/src/UnwrapReadableStream.php
index edf8a04..5b93fc8 100644
--- a/src/UnwrapReadableStream.php
+++ b/src/UnwrapReadableStream.php
@@ -75,9 +75,11 @@ function (ReadableStreamInterface $stream) use ($out, &$closed) {
 
                 return $stream;
             },
-            function ($e) use ($out) {
-                $out->emit('error', array($e, $out));
-                $out->close();
+            function ($e) use ($out, &$closed) {
+                if (!$closed) {
+                    $out->emit('error', array($e, $out));
+                    $out->close();
+                }
             }
         );
     }
diff --git a/src/UnwrapWritableStream.php b/src/UnwrapWritableStream.php
index acc7c73..d833f4c 100644
--- a/src/UnwrapWritableStream.php
+++ b/src/UnwrapWritableStream.php
@@ -88,9 +88,11 @@ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$endin
 
                 return $stream;
             },
-            function ($e) use ($out) {
-                $out->emit('error', array($e, $out));
-                $out->close();
+            function ($e) use ($out, &$closed) {
+                if (!$closed) {
+                    $out->emit('error', array($e, $out));
+                    $out->close();
+                }
             }
         );
     }
diff --git a/tests/UnwrapReadableTest.php b/tests/UnwrapReadableTest.php
index c0b5846..a5b5268 100644
--- a/tests/UnwrapReadableTest.php
+++ b/tests/UnwrapReadableTest.php
@@ -31,12 +31,28 @@ public function testClosingStreamMakesItNotReadable()
 
         $stream->on('close', $this->expectCallableOnce());
         $stream->on('end', $this->expectCallableNever());
+        $stream->on('error', $this->expectCallableNever());
 
         $stream->close();
 
         $this->assertFalse($stream->isReadable());
     }
 
+    public function testClosingRejectingStreamMakesItNotReadable()
+    {
+        $promise = Timer\reject(0.001, $this->loop);
+        $stream = Stream\unwrapReadable($promise);
+
+        $stream->on('close', $this->expectCallableOnce());
+        $stream->on('end', $this->expectCallableNever());
+        $stream->on('error', $this->expectCallableNever());
+
+        $stream->close();
+        $this->loop->run();
+
+        $this->assertFalse($stream->isReadable());
+    }
+
     public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotReadable()
     {
         $promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce());
diff --git a/tests/UnwrapWritableTest.php b/tests/UnwrapWritableTest.php
index e21500e..5a8d093 100644
--- a/tests/UnwrapWritableTest.php
+++ b/tests/UnwrapWritableTest.php
@@ -30,12 +30,27 @@ public function testClosingStreamMakesItNotWritable()
         $stream = Stream\unwrapWritable($promise);
 
         $stream->on('close', $this->expectCallableOnce());
+        $stream->on('error', $this->expectCallableNever());
 
         $stream->close();
 
         $this->assertFalse($stream->isWritable());
     }
 
+    public function testClosingRejectingStreamMakesItNotWritable()
+    {
+        $promise = Timer\reject(0.001, $this->loop);
+        $stream = Stream\unwrapWritable($promise);
+
+        $stream->on('close', $this->expectCallableOnce());
+        $stream->on('error', $this->expectCallableNever());
+
+        $stream->close();
+        $this->loop->run();
+
+        $this->assertFalse($stream->isWritable());
+    }
+
     public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotWritable()
     {
         $promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce());
@@ -247,6 +262,7 @@ public function testEmitsCloseOnlyOnceWhenClosingStreamMultipleTimes()
         $stream = Stream\unwrapWritable($promise);
 
         $stream->on('close', $this->expectCallableOnce());
+        $stream->on('error', $this->expectCallableNever());
 
         $stream->close();
         $stream->close();