From dedbc6a9a63bc3e5688f2f7da5aa94d480508996 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Fri, 20 Sep 2024 11:26:50 +0100 Subject: [PATCH] Revert "Adopt `NIOThrowingAsyncSequenceProducer` (#2879)" This reverts commit 282f5935cf3352b3d026c35eb57cb3619dd9536f. --- Sources/NIOFileSystem/DirectoryEntries.swift | 212 +++++--------- Sources/NIOFileSystem/FileChunks.swift | 264 ++++++------------ .../Internal/BufferedOrAnyStream.swift | 24 +- .../FileHandleTests.swift | 2 +- 4 files changed, 165 insertions(+), 337 deletions(-) diff --git a/Sources/NIOFileSystem/DirectoryEntries.swift b/Sources/NIOFileSystem/DirectoryEntries.swift index 105b2b7b85..60734ff402 100644 --- a/Sources/NIOFileSystem/DirectoryEntries.swift +++ b/Sources/NIOFileSystem/DirectoryEntries.swift @@ -16,7 +16,6 @@ import CNIODarwin import CNIOLinux import NIOConcurrencyHelpers -import NIOCore import NIOPosix @preconcurrency import SystemPackage @@ -90,17 +89,17 @@ extension DirectoryEntries { public typealias AsyncIterator = BatchedIterator public typealias Element = [DirectoryEntry] - private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer> + private let stream: BufferedOrAnyStream<[DirectoryEntry]> /// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence` /// of directory entry batches. public init(wrapping sequence: S) where S.Element == Element { - self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence) + self.stream = BufferedOrAnyStream(wrapping: sequence) } fileprivate init(handle: SystemFileHandle, recursive: Bool) { // Expanding the batches yields watermarks of 256 and 512 directory entries. - let stream = NIOThrowingAsyncSequenceProducer.makeBatchedDirectoryEntryStream( + let stream = BufferedStream.makeBatchedDirectoryEntryStream( handle: handle, recursive: recursive, entriesPerBatch: 64, @@ -117,11 +116,9 @@ extension DirectoryEntries { /// An `AsyncIteratorProtocol` of `Array`. public struct BatchedIterator: AsyncIteratorProtocol { - private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator + private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator - fileprivate init( - wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator - ) { + init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) { self.iterator = iterator } @@ -138,95 +135,52 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {} // MARK: - Internal @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOThrowingAsyncSequenceProducer -where - Element == [DirectoryEntry], - Failure == (any Error), - Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, - Delegate == DirectoryEntryProducer -{ +extension BufferedStream where Element == [DirectoryEntry] { fileprivate static func makeBatchedDirectoryEntryStream( handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int, lowWatermark: Int, highWatermark: Int - ) -> NIOThrowingAsyncSequenceProducer< - [DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, - DirectoryEntryProducer - > { - let producer = DirectoryEntryProducer( - handle: handle, - recursive: recursive, - entriesPerBatch: entriesPerBatch - ) + ) -> BufferedStream<[DirectoryEntry]> { + let state = DirectoryEnumerator(handle: handle, recursive: recursive) + let protectedState = NIOLockedValueBox(state) - let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence( - elementType: [DirectoryEntry].self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( - lowWatermark: lowWatermark, - highWatermark: highWatermark - ), - finishOnDeinit: false, - delegate: producer + var (stream, source) = BufferedStream.makeStream( + of: [DirectoryEntry].self, + backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark) ) - producer.setSequenceProducerSource(nioThrowingAsyncSequence.source) + source.onTermination = { + guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else { + return + } + + threadPool.submit { _ in // always run, even if cancelled + protectedState.withLockedValue { state in + state.closeIfNecessary() + } + } + } + + let producer = DirectoryEntryProducer( + state: protectedState, + source: source, + entriesPerBatch: entriesPerBatch + ) + // Start producing immediately. + producer.produceMore() - return nioThrowingAsyncSequence.sequence + return stream } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer< - [DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer -> - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { +private struct DirectoryEntryProducer { let state: NIOLockedValueBox + let source: BufferedStream<[DirectoryEntry]>.Source let entriesPerBatch: Int - init(handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int) { - let state = DirectoryEnumerator(handle: handle, recursive: recursive) - self.state = NIOLockedValueBox(state) - self.entriesPerBatch = entriesPerBatch - } - - func didTerminate() { - guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else { - return - } - - threadPool.submit { _ in // always run, even if cancelled - self.state.withLockedValue { state in - state.closeIfNecessary() - } - } - } - - /// sets the source within the producer state - func setSequenceProducerSource(_ sequenceProducerSource: DirectoryEntrySequenceProducer.Source) { - self.state.withLockedValue { state in - switch state.state { - case .idle: - state.sequenceProducerSource = sequenceProducerSource - case .done: - sequenceProducerSource.finish() - case .open, .openPausedProducing: - fatalError() - case .modifying: - fatalError() - } - } - } - - func clearSource() { - self.state.withLockedValue { state in - state.sequenceProducerSource = nil - } - } - /// The 'entry point' for producing elements. /// /// Calling this function will start producing directory entries asynchronously by dispatching @@ -253,12 +207,6 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { } } - func pauseProducing() { - self.state.withLockedValue { state in - state.pauseProducing() - } - } - private func nextBatch() throws -> [DirectoryEntry] { try self.state.withLockedValue { state in try state.next(self.entriesPerBatch) @@ -273,28 +221,14 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { // Failed to read more entries: close and notify the stream so consumers receive the // error. self.close() - let source = self.state.withLockedValue { state in - state.sequenceProducerSource - } - source?.finish(error) - self.clearSource() + self.source.finish(throwing: error) } } private func onNextBatch(_ entries: [DirectoryEntry]) { - let source = self.state.withLockedValue { state in - state.sequenceProducerSource - } - - guard let source else { - assertionFailure("unexpectedly missing source") - return - } - // No entries were read: this must be the end (as the batch size must be greater than zero). if entries.isEmpty { - source.finish() - self.clearSource() + self.source.finish(throwing: nil) return } @@ -302,22 +236,30 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { let readEOF = entries.count < self.entriesPerBatch // Entries were produced: yield them and maybe produce more. - let writeResult = source.yield(contentsOf: CollectionOfOne(entries)) - - // Exit early if EOF was read; no use in trying to produce more. - if readEOF { - source.finish() - self.clearSource() - return - } + do { + let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries)) + // Exit early if EOF was read; no use in trying to produce more. + if readEOF { + self.source.finish(throwing: nil) + return + } - switch writeResult { - case .produceMore: - self.produceMore() - case .stopProducing: - self.pauseProducing() - case .dropped: - // The source is finished; mark ourselves as done. + switch writeResult { + case .produceMore: + self.produceMore() + case let .enqueueCallback(token): + self.source.enqueueCallback(callbackToken: token) { + switch $0 { + case .success: + self.produceMore() + case .failure: + self.close() + } + } + } + } catch { + // Failure to write means the source is already done, that's okay we just need to + // update our state and stop producing. self.close() } } @@ -340,30 +282,25 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { /// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) private struct DirectoryEnumerator: Sendable { - internal enum State: @unchecked Sendable { + private enum State: @unchecked Sendable { case modifying case idle(SystemFileHandle.SendableView, recursive: Bool) case open(NIOThreadPool, Source, [DirectoryEntry]) - case openPausedProducing(NIOThreadPool, Source, [DirectoryEntry]) case done } /// The source of directory entries. - internal enum Source { + private enum Source { case readdir(CInterop.DirPointer) case fts(CInterop.FTSPointer) } /// The current state of enumeration. - internal var state: State + private var state: State /// The path to the directory being enumerated. private let path: FilePath - /// The route via which directory entry batches are yielded, - /// the sourcing end of the `DirectoryEntrySequenceProducer` - internal var sequenceProducerSource: DirectoryEntrySequenceProducer.Source? - /// Information about an entry returned by FTS. See 'fts(3)'. private enum FTSInfo: Hashable, Sendable { case directoryPreOrder @@ -416,15 +353,12 @@ private struct DirectoryEnumerator: Sendable { self.path = handle.path } - internal mutating func produceMore() -> NIOThreadPool? { + internal func produceMore() -> NIOThreadPool? { switch self.state { case let .idle(handle, _): return handle.threadPool case let .open(threadPool, _, _): return threadPool - case .openPausedProducing(let threadPool, let source, let array): - self.state = .open(threadPool, source, array) - return threadPool case .done: return nil case .modifying: @@ -432,22 +366,9 @@ private struct DirectoryEnumerator: Sendable { } } - internal mutating func pauseProducing() { - switch self.state { - case .open(let threadPool, let source, let array): - self.state = .openPausedProducing(threadPool, source, array) - case .idle: - () // we won't apply back pressure until something has been read - case .openPausedProducing, .done: - () // no-op - case .modifying: - fatalError() - } - } - internal func threadPoolForClosing() -> NIOThreadPool? { switch self.state { - case .open(let threadPool, _, _), .openPausedProducing(let threadPool, _, _): + case let .open(threadPool, _, _): return threadPool case .idle, .done: // Don't need to close in the idle state: we don't own the handle. @@ -476,7 +397,7 @@ private struct DirectoryEnumerator: Sendable { // We don't own the handle so don't close it. self.state = .done - case .open(_, let mode, _), .openPausedProducing(_, let mode, _): + case let .open(_, mode, _): self.state = .done switch mode { case .readdir(let dir): @@ -710,9 +631,6 @@ private struct DirectoryEnumerator: Sendable { return result } - case .openPausedProducing: - return .yield(.success([])) - case .done: return .yield(.success([])) diff --git a/Sources/NIOFileSystem/FileChunks.swift b/Sources/NIOFileSystem/FileChunks.swift index 13739b7091..b37648a67f 100644 --- a/Sources/NIOFileSystem/FileChunks.swift +++ b/Sources/NIOFileSystem/FileChunks.swift @@ -29,7 +29,7 @@ public struct FileChunks: AsyncSequence, Sendable { public typealias Element = ByteBuffer /// The underlying buffered stream. - private let stream: BufferedOrAnyStream + private let stream: BufferedOrAnyStream /// Create a ``FileChunks`` sequence backed by wrapping an `AsyncSequence`. public init(wrapping sequence: S) where S.Element == ByteBuffer { @@ -50,7 +50,7 @@ public struct FileChunks: AsyncSequence, Sendable { // TODO: choose reasonable watermarks; this should likely be at least somewhat dependent // on the chunk size. - let stream = NIOThrowingAsyncSequenceProducer.makeFileChunksStream( + let stream = BufferedStream.makeFileChunksStream( of: ByteBuffer.self, handle: handle, chunkLength: chunkLength.bytes, @@ -67,9 +67,9 @@ public struct FileChunks: AsyncSequence, Sendable { } public struct FileChunkIterator: AsyncIteratorProtocol { - private var iterator: BufferedOrAnyStream.AsyncIterator + private var iterator: BufferedOrAnyStream.AsyncIterator - fileprivate init(wrapping iterator: BufferedOrAnyStream.AsyncIterator) { + fileprivate init(wrapping iterator: BufferedOrAnyStream.AsyncIterator) { self.iterator = iterator } @@ -85,18 +85,7 @@ extension FileChunks.FileChunkIterator: Sendable {} // MARK: - Internal @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private typealias FileChunkSequenceProducer = NIOThrowingAsyncSequenceProducer< - ByteBuffer, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, FileChunkProducer -> - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOThrowingAsyncSequenceProducer -where - Element == ByteBuffer, - Failure == Error, - Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, - Delegate == FileChunkProducer -{ +extension BufferedStream where Element == ByteBuffer { static func makeFileChunksStream( of: Element.Type = Element.self, handle: SystemFileHandle, @@ -104,77 +93,53 @@ where range: FileChunks.ChunkRange, lowWatermark: Int, highWatermark: Int - ) -> FileChunkSequenceProducer { + ) -> BufferedStream { + let state: ProducerState + switch range { + case .entireFile: + state = ProducerState(handle: handle, range: nil) + case .partial(let partialRange): + state = ProducerState(handle: handle, range: partialRange) + } + let protectedState = NIOLockedValueBox(state) - let producer = FileChunkProducer( - range: range, - handle: handle, - length: chunkLength + var (stream, source) = BufferedStream.makeStream( + of: ByteBuffer.self, + backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark) ) - let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence( - elementType: ByteBuffer.self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( - lowWatermark: 4, - highWatermark: 8 - ), - finishOnDeinit: false, - delegate: producer - ) + source.onTermination = { + protectedState.withLockedValue { state in + state.done() + } + } - producer.setSource(nioThrowingAsyncSequence.source) + // Start producing immediately. + let producer = FileChunkProducer( + state: protectedState, + source: source, + path: handle.path, + length: chunkLength + ) + producer.produceMore() - return nioThrowingAsyncSequence.sequence + return stream } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendable { +private struct FileChunkProducer: Sendable { let state: NIOLockedValueBox - + let source: BufferedStream.Source let path: FilePath let length: Int64 - init(range: FileChunks.ChunkRange, handle: SystemFileHandle, length: Int64) { - let state: ProducerState - switch range { - case .entireFile: - state = .init(handle: handle, range: nil) - case .partial(let partialRange): - state = .init(handle: handle, range: partialRange) - } - - self.state = NIOLockedValueBox(state) - self.path = handle.path - self.length = length - } - - /// sets the source within the producer state - func setSource(_ source: FileChunkSequenceProducer.Source) { - self.state.withLockedValue { state in - switch state.state { - case .producing, .pausedProducing: - state.source = source - case .done(let emptyRange): - if emptyRange { - source.finish() - } - } - } - } - - func clearSource() { - self.state.withLockedValue { state in - state.source = nil - } - } - /// The 'entry point' for producing elements. /// /// Calling this function will start producing file chunks asynchronously by dispatching work /// to the IO executor and feeding the result back to the stream source. On yielding to the /// source it will either produce more or be scheduled to produce more. Stopping production - /// is signaled via the stream's 'onTermination' handler. + /// is signalled via the stream's 'onTermination' handler. func produceMore() { let threadPool = self.state.withLockedValue { state in state.shouldProduceMore() @@ -228,74 +193,65 @@ private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendabl case let .failure(error): // Failed to read: update our state then notify the stream so consumers receive the // error. - - let source = self.state.withLockedValue { state in - state.done() - return state.source - } - source?.finish(error) - self.clearSource() + self.state.withLockedValue { state in state.done() } + self.source.finish(throwing: error) } } - private func onReadNextChunk(_ buffer: ByteBuffer) { + private func onReadNextChunk(_ bytes: ByteBuffer) { // Reading short means EOF. - let readEOF = buffer.readableBytes < self.length - assert(buffer.readableBytes <= self.length) + let readEOF = bytes.readableBytes < self.length + assert(bytes.readableBytes <= self.length) - let source = self.state.withLockedValue { state in + self.state.withLockedValue { state in if readEOF { - state.didReadEnd() + state.readEnd() } else { - state.didReadBytes(buffer.readableBytes) + state.readBytes(bytes.readableBytes) } - return state.source - } - - guard let source else { - assertionFailure("unexpectedly missing source") - return } // No bytes were read: this must be the end as length is required to be greater than zero. - if buffer.readableBytes == 0 { + if bytes.readableBytes == 0 { assert(readEOF, "read zero bytes but did not read EOF") - source.finish() - self.clearSource() + self.source.finish(throwing: nil) return } // Bytes were produced: yield them and maybe produce more. - let writeResult = source.yield(contentsOf: CollectionOfOne(buffer)) - - // Exit early if EOF was read; no use in trying to produce more. - if readEOF { - source.finish() - self.clearSource() - return - } + do { + let writeResult = try self.source.write(contentsOf: CollectionOfOne(bytes)) + // Exit early if EOF was read; no use in trying to produce more. + if readEOF { + self.source.finish(throwing: nil) + return + } - switch writeResult { - case .produceMore: - self.produceMore() - case .stopProducing: - self.state.withLockedValue { state in state.pauseProducing() } - case .dropped: - // The source is finished; mark ourselves as done. + switch writeResult { + case .produceMore: + self.produceMore() + case let .enqueueCallback(token): + self.source.enqueueCallback(callbackToken: token) { + switch $0 { + case .success: + self.produceMore() + case .failure: + // The source is finished; mark ourselves as done. + self.state.withLockedValue { state in state.done() } + } + } + } + } catch { + // Failure to write means the source is already done, that's okay we just need to + // update our state and stop producing. self.state.withLockedValue { state in state.done() } } } - - func didTerminate() { - self.state.withLockedValue { state in - state.done() - } - } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) private struct ProducerState: Sendable { - fileprivate struct Producing { + private struct Producing { /// The handle to read from. var handle: SystemFileHandle.SendableView @@ -306,24 +262,18 @@ private struct ProducerState: Sendable { var range: Range? } - internal enum State { + private enum State { /// Can potentially produce values (if the handle is not closed). case producing(Producing) - /// Backpressure policy means that we should stop producing new values for now - case pausedProducing(Producing) /// Done producing values either by reaching EOF, some error or the stream terminating. - case done(emptyRange: Bool) + case done } - internal var state: State - - /// The route via which file chunks are yielded, - /// the sourcing end of the `FileChunkSequenceProducer` - internal var source: FileChunkSequenceProducer.Source? + private var state: State init(handle: SystemFileHandle, range: Range?) { if let range, range.isEmpty { - self.state = .done(emptyRange: true) + self.state = .done } else { self.state = .producing(.init(handle: handle.sendableView, range: range)) } @@ -333,8 +283,6 @@ private struct ProducerState: Sendable { switch self.state { case let .producing(state): return state.handle.threadPool - case .pausedProducing: - return nil case .done: return nil } @@ -354,77 +302,45 @@ private struct ProducerState: Sendable { ) return .failure(error) } - case .pausedProducing: - return .success(nil) case .done: return .success(nil) } } - mutating func didReadBytes(_ count: Int) { + mutating func readBytes(_ count: Int) { switch self.state { case var .producing(state): - if state.didReadBytes(count) { - self.state = .done(emptyRange: false) - } else { - self.state = .producing(state) - } - case var .pausedProducing(state): - if state.didReadBytes(count) { - self.state = .done(emptyRange: false) + if let currentRange = state.range { + let newRange = (currentRange.lowerBound + Int64(count))..= newRange.upperBound { + assert(newRange.lowerBound == newRange.upperBound) + self.state = .done + } else { + state.range = newRange + self.state = .producing(state) + } } else { - self.state = .pausedProducing(state) + if count == 0 { + self.state = .done + } } case .done: () } } - mutating func didReadEnd() { + mutating func readEnd() { switch self.state { - case .pausedProducing, .producing: - self.state = .done(emptyRange: false) + case .producing: + self.state = .done case .done: () } } - mutating func pauseProducing() { - switch self.state { - case .producing(let state): - self.state = .pausedProducing(state) - case .pausedProducing, .done: - () - } - } - mutating func done() { - self.state = .done(emptyRange: false) + self.state = .done } } -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension ProducerState.Producing { - /// Updates the range (the offsets to read from and up to) to reflect the number of bytes which have been read. - /// - Parameter count: The number of bytes which have been read. - /// - Returns: Returns `True` if there are no remaining bytes to read, `False` otherwise. - mutating func didReadBytes(_ count: Int) -> Bool { - guard let currentRange = self.range else { - // if we read 0 bytes we are done - return count == 0 - } - - let newLowerBound = currentRange.lowerBound + Int64(count) - - // we have run out of bytes to read, we are done - if newLowerBound >= currentRange.upperBound { - self.range = currentRange.upperBound..`` or ``AnyAsyncSequence``. +/// Wraps a ``BufferedStream`` or ``AnyAsyncSequence``. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -internal enum BufferedOrAnyStream { - typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer< - Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate - > - - case nioThrowingAsyncSequenceProducer(AsyncSequenceProducer) +internal enum BufferedOrAnyStream { + case bufferedStream(BufferedStream) case anyAsyncSequence(AnyAsyncSequence) - internal init(wrapping stream: AsyncSequenceProducer) { - self = .nioThrowingAsyncSequenceProducer(stream) + internal init(wrapping stream: BufferedStream) { + self = .bufferedStream(stream) } internal init(wrapping stream: S) where S.Element == Element { @@ -35,7 +29,7 @@ internal enum BufferedOrAnyStream AsyncIterator { switch self { - case let .nioThrowingAsyncSequenceProducer(stream): + case let .bufferedStream(stream): return AsyncIterator(wrapping: stream.makeAsyncIterator()) case let .anyAsyncSequence(stream): return AsyncIterator(wrapping: stream.makeAsyncIterator()) @@ -43,13 +37,13 @@ internal enum BufferedOrAnyStream.AsyncIterator) case anyAsyncSequence(AnyAsyncSequence.AsyncIterator) internal mutating func next() async throws -> Element? { let element: Element? switch self { - case let .bufferedStream(iterator): + case var .bufferedStream(iterator): defer { self = .bufferedStream(iterator) } element = try await iterator.next() case var .anyAsyncSequence(iterator): @@ -59,7 +53,7 @@ internal enum BufferedOrAnyStream.AsyncIterator) { self = .bufferedStream(iterator) } diff --git a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift index 17217b4008..6e79be82da 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift @@ -546,7 +546,7 @@ final class FileHandleTests: XCTestCase { func testReadRangeLongerThanChunkAndNotMultipleOfChunkLength() async throws { // Reading chunks of bytes from within a range longer than the chunklength - // and with size not a multiple of the chunklength. + // and with size not a multiple of the chunklegth. try await self.withHandle(forFileAtPath: Self.thisFile) { handle in var bytes = ByteBuffer() for try await chunk in handle.readChunks(in: 0...200, chunkLength: .bytes(128)) {