Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Catch only non-fatal exceptions in Source.lines #187

Closed
wants to merge 1 commit into from
Closed

Catch only non-fatal exceptions in Source.lines #187

wants to merge 1 commit into from

Conversation

wjoel
Copy link
Contributor

@wjoel wjoel commented Aug 4, 2024

Otherwise Control-C doesn't terminate the program.

Otherwise Control-C doesn't terminate the program.
@adamw
Copy link
Member

adamw commented Aug 5, 2024

Could you provide an example where this doesn't work? In the stream operators, we generally catch all errors as a rule, and propagate them to the channel. The idea is that only the ultimate consumer will throw, and in effect wind down the whole scope. But that idea might be wrong, of course ;)

@wjoel
Copy link
Contributor Author

wjoel commented Aug 5, 2024

Well, to be fair, I have a slight modification of this which does a tail on a file, so it runs continuously and "forever", and in that scenario it's very obvious that (and very annoying that) Control-C doesn't work.

However, I would assume that if you have a very very large file then Control-C won't actually interrupt your program until that file has been consumed (ie. the source has been read until the very end), without the change in this PR. I might have time to test that this evening (I need to find/create some very large text file and maybe put it on a slow USB stick).

I just started with ox, so I might be doing something incorrectly. I did a quick search for catch case e and this was the only non-test and non-example search hit, and I saw the advice at https://ox.softwaremill.com/latest/structured-concurrency/interruptions.html#interruptions so it seemed like a mistake and changing it did fix my issue with Control-C, but it might be the wrong fix?

@adamw
Copy link
Member

adamw commented Aug 5, 2024

@wjoel Yes you're right, we don't follow our own advice here ;) See e.g.

catch case t: Throwable => c.errorOrClosed(t)
- it's the same pattern, all exceptions are propagated. (Although, this should probably be case e: Exception, not Throwable)

But I still think that ctrl+c should work just fine - it should interrupt the top-level consuming thread, which should end the entire scope, interrupting all daemon forks (such as the one created by fromFile). There's no "right" way to use Ox yet, it would be great to get at least an outline of how you're trying to use Ox - ideally the API would be such that people would simply do the "right thing" :) But as it seems we're not there yet, this example is even more interesting.

@wjoel
Copy link
Contributor Author

wjoel commented Aug 5, 2024

@adamw tl;dr it's fine, but maybe the fork-forever behavior in the face of interrupts is unexpected? Maybe not?

I see, ok, first I verified that just a simple "OxCat" can be interrupted with Control-C, works as it should:

  def run(using Ox, IO): Unit =
    supervised:
      IO.unsafe:
        println("hello")
        Source.fromFile(Paths.get("/usr/share/dict/linux.words")).linesUtf8.foreach: line =>
          println(line)
    ExitCode.Success

After thinking about it for a bit, I think the main issue was that I didn't understand the purpose of c.errorOrClosed, because I had this:

  def tail(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] =
    if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory")
    val chunks = StageCapacity.newChannel[Chunk[Byte]]

    fork {
      forever {
        val jFileChannel =
          try FileChannel.open(path, StandardOpenOption.READ)
          catch
            case _: UnsupportedOperationException =>
              // Some file systems don't support file channels
              Files.newByteChannel(path, StandardOpenOption.READ)
        jFileChannel.position(jFileChannel.size())
        try {
          repeatWhile {
            val fileSize = useCloseable(FileChannel.open(path, StandardOpenOption.READ)): fileChannel =>
              fileChannel.size()
            if jFileChannel.position() > fileSize then
              false
            else
              val buf = ByteBuffer.allocate(chunkSize)
              val readBytes = jFileChannel.read(buf)
              if readBytes <= 0 then
                sleep(100.millis)
              else if readBytes > 0 then
                chunks.send(
                  Chunk.fromArray(
                    if readBytes == chunkSize then buf.array
                    else buf.array.take(readBytes)
                  )
                )
              true
          }
        } catch case NonFatal(e) => chunks.errorOrClosed(e).discard
        finally
          try jFileChannel.close()
          catch
            case NonFatal(closeException) =>
              chunks.errorOrClosed(closeException).discard
      }
    }
    chunks

... and it works, in the sense that it gets terminated, when I use NonFatal(e). Otherwise it just keeps going, as it's a daemon thread. The proper fix (I think) is to change it to check the status of the channel in a repeatWhile instead of forever. The purpose of that outer loop is to handle log file rotations.

This modified version works with Control-C:

  def tail(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity, IO): Source[Chunk[Byte]] =
    if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory")
    val chunks = StageCapacity.newChannel[Chunk[Byte]]

    fork {
      repeatWhile {
        println("tail")
        val jFileChannel =
          try FileChannel.open(path, StandardOpenOption.READ)
          catch
            case _: UnsupportedOperationException =>
              // Some file systems don't support file channels
              Files.newByteChannel(path, StandardOpenOption.READ)
        jFileChannel.position(jFileChannel.size())
        try {
          repeatWhile {
            val fileSize = useCloseable(FileChannel.open(path, StandardOpenOption.READ)): fileChannel =>
              fileChannel.size()
            if jFileChannel.position() > fileSize then
              false
            else
              val buf = ByteBuffer.allocate(chunkSize)
              val readBytes = jFileChannel.read(buf)
              if readBytes <= 0 then
                sleep(100.millis)
              else if readBytes > 0 then
                chunks.send(
                  Chunk.fromArray(
                    if readBytes == chunkSize then buf.array
                    else buf.array.take(readBytes)
                  )
                )
              true
          }
        } catch case e => chunks.errorOrClosed(e).discard
        finally
          try jFileChannel.close()
          catch
            case NonFatal(closeException) =>
              chunks.errorOrClosed(closeException).discard
        !chunks.isClosedForReceive
      }
    }
    chunks

All is well, no need for this PR. Thanks for merging the other one!

That forever does stop the daemon thread from being interrupted, but I'm not sure I'd call it unexpected. Maybe a little bit? I'll leave this PR for you to close it, just so you can acknowledge seeing this last comment.

@wjoel
Copy link
Contributor Author

wjoel commented Aug 5, 2024

I do wonder if your startProcesses() example in https://softwaremill.com/direct-style-bootzooka-2024-update/ handles interruption...? It looks very similar to the issue I was having with my own fork-forever and catching all exceptions.

@adamw
Copy link
Member

adamw commented Aug 5, 2024

This definition of tail should be fine (though an unrelated question - why is the outer forever there, which opens the channel - don't we want to do that only once?). The question is, where's the code that is consuming the chunks that are returned from the tail method? Because ultimately there should be a top-level non-daemon thread where this is consumed. And it's that process that will get interrupted first, and which will cause the entire scope to end.

@adamw
Copy link
Member

adamw commented Aug 5, 2024

I do wonder if your startProcesses() example in https://softwaremill.com/direct-style-bootzooka-2024-update/ handles interruption...? It looks very similar to the issue I was having with my own fork-forever and catching all exceptions.

Ah of course! There should be NonFatal(e). Fixed - thanks!

The streaming code is different - there, the catch is always outside the loop. So in case of an exception, the loop ends, and only then we propagate the exception to the stream. But the fork ends after that.

@wjoel
Copy link
Contributor Author

wjoel commented Aug 5, 2024

This definition of tail should be fine (though an unrelated question - why is the outer forever there, which opens the channel - don't we want to do that only once?).

The Ox Channel is opened only once, in val chunks = StageCapacity.newChannel[Chunk[Byte]]. I want to open the FileChannel / SeekableByteChannel more than once, if the file is rotated. The .foreach on the Source does seem to get interrupted, but the fork-forever keeps going.

It seems .send on a closed (Ox) Channel doesn't fail, or if it does fail, the exception just gets propagated (again, and now there's no one listening) by catch case e => chunks.errorOrClosed(e).discard. So perhaps errorOrClosed(e).discard should rethrow e if the channel is already closed, if it doesn't already? Not sure, but something seems off here.

The question is, where's the code that is consuming the chunks that are returned from the tail method? Because ultimately there should be a top-level non-daemon thread where this is consumed. And it's that process that will get interrupted first, and which will cause the entire scope to end.

It's only this, watchAndPrintStats is essentially (no further forking or anything, just straightforward Scala) just a .foreach on the Source:

  def run(args: Vector[String])(using Ox, IO): ExitCode =
    supervised:
      IO.unsafe:
        val logLines = tail(warningsLog).myLines(StandardCharsets.UTF_8).map(_.strip)
        watchAndPrintStats(logLines)
    ExitCode.Success

@adamw
Copy link
Member

adamw commented Aug 6, 2024

The .foreach on the Source does seem to get interrupted, but the fork-forever keeps going.

Ah! I'm re-reading your code now, and if you have the try-catch inside the forever then yes, you should only catch NonFatal exceptions. That's always the safer thing to do. The idea on error handling in stream operators is that when an exception occurs, we propagate it to the channel and end the fork. The background for this is somewhat explained in ADR#1 and ADR#3. This allows for a no-throwing operation on channels.

But if an exception is thrown inside a stream operator, that's fine as well - the only thing that might happen is that the forks might end for a different reason and in a different order.

It seems .send on a closed (Ox) Channel doesn't fail, or if it does fail, the exception just gets propagated (again, and now there's no one listening) by catch case e => chunks.errorOrClosed(e).discard. So perhaps errorOrClosed(e).discard should rethrow e if the channel is already closed, if it doesn't already? Not sure, but something seems off here.

The throws-doesn't throw is exactly the difference between .error and .errorOrClosed. The first throws, the second doesn't, but returns the possible already existing error state as a value. So this should be fine, as long as the fork in question finishes.

Now I'm thinking, maybe we should have some abstraction for a fork-and-propagate scenario, which we can reuse in the stream operators. It might be less error prone then.

I'm closing this PR for now, but if something still doesn't work, please let us know :)

@adamw adamw closed this Aug 6, 2024
@adamw
Copy link
Member

adamw commented Aug 6, 2024

See: #191

@wjoel wjoel deleted the catch-nonfatal-in-lines branch August 15, 2024 17:07
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants