Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

PQ: Event containing 20+MB String cannot be deserialized for processing #16683

Closed
yaauie opened this issue Nov 18, 2024 · 2 comments · Fixed by #16719
Closed

PQ: Event containing 20+MB String cannot be deserialized for processing #16683

yaauie opened this issue Nov 18, 2024 · 2 comments · Fixed by #16719

Comments

@yaauie
Copy link
Member

yaauie commented Nov 18, 2024

Logstash information:

Please include the following information:

  1. Logstash version (e.g. bin/logstash --version) 8.x, 8.15, main
  2. Logstash installation source (e.g. built from source, with a package manager: DEB/RPM, expanded from tar or zip archive, docker) N/A
  3. How is Logstash being run (e.g. as a service/service manager: systemd, upstart, etc. Via command line, docker/kubernetes) Any

Plugins installed: (bin/logstash-plugin list --verbose): N/A

JVM (e.g. java -version): Any

OS version (uname -a if on a Unix-like system): Any

Description of the problem including expected versus actual behavior:

Steps to reproduce:

  1. enable the PQ
    echo 'queue.type: persisted' > config/logstash.yml`
    
  2. generate a 20MB+ newline-terminated string
    (dd if=<(base64 < /dev/urandom) bs=1K count="$(expr 20 '*' 1024)"; echo) > big.txt
    
  3. run logstash in a way where the input plugin will generate an event containing the aforementioned 20MB+ string:
    bin/logstash -e 'input { stdin { codec => plain } } output { stdout { codec => dots } }' < big.txt 
    

Provide logs (if relevant):

(using improved pipeline cause-chain logging from #16677):

Using system java: /Users/rye/.jenv/shims/java
Sending Logstash logs to /Users/rye/src/elastic/logstash@main/logs which is now configured via log4j2.properties
[2024-11-18T19:13:30,831][INFO ][logstash.runner          ] Log4j configuration path used is: /Users/rye/src/elastic/logstash@main/config/log4j2.properties
[2024-11-18T19:13:30,834][WARN ][logstash.runner          ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2024-11-18T19:13:30,834][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"9.0.0", "jruby.version"=>"jruby 9.4.9.0 (3.1.4) 2024-11-04 547c6b150e OpenJDK 64-Bit Server VM 17.0.12+0 on 17.0.12+0 +indy +jit [arm64-darwin]"}
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dlogstash.jackson.stream-read-constraints.max-string-length=400000000, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `400000000`
[2024-11-18T19:13:30,835][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
[2024-11-18T19:13:30,849][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-11-18T19:13:31,031][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-11-18T19:13:31,138][INFO ][org.reflections.Reflections] Reflections took 35 ms to scan 1 urls, producing 149 keys and 522 values
[2024-11-18T19:13:31,217][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2024-11-18T19:13:31,227][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x10df8cf8 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
[2024-11-18T19:13:31,429][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.2}
[2024-11-18T19:13:31,444][INFO ][logstash.inputs.stdin    ][main] Automatically switching from plain to line codec {:plugin=>"stdin"}
[2024-11-18T19:13:31,449][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-11-18T19:13:31,458][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2024-11-18T19:13:31,516][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :exception=>"Java::OrgLogstashAckedqueue::QueueRuntimeException", :error=>"deserialize invocation error", :stacktrace=>"org.logstash.ackedqueue.Queue.deserialize(Queue.java:752)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)", :cause=>{:exception=>"Java::JavaLangReflect::InvocationTargetException", :error=>"", :stacktrace=>"java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.logstash.ackedqueue.Queue.deserialize(Queue.java:750)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)", :cause=>{:exception=>"Java::ComFasterxmlJacksonCoreExc::StreamConstraintsException", :error=>"String value length (20051112) exceeds the maximum allowed (20000000, from `StreamReadConstraints.getMaxStringLength()`)", :stacktrace=>"com.fasterxml.jackson.core.StreamReadConstraints._constructException(StreamReadConstraints.java:549)\ncom.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:484)\ncom.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27)\ncom.fasterxml.jackson.core.util.TextBuffer.finishCurrentSegment(TextBuffer.java:939)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser._finishChunkedText(CBORParser.java:2556)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser._finishTextToken(CBORParser.java:2292)\ncom.fasterxml.jackson.dataformat.cbor.CBORParser.getValueAsString(CBORParser.java:1686)\norg.logstash.ObjectMappers$RubyStringDeserializer.deserialize(ObjectMappers.java:183)\norg.logstash.ObjectMappers$RubyStringDeserializer.deserialize(ObjectMappers.java:172)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)\ncom.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR.deserializeWithType(UntypedObjectDeserializerNR.java:115)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:625)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)\ncom.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializerNR.deserializeWithType(UntypedObjectDeserializerNR.java:115)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:625)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:120)\ncom.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromObject(AsArrayTypeDeserializer.java:61)\ncom.fasterxml.jackson.databind.deser.std.MapDeserializer.deserializeWithType(MapDeserializer.java:492)\ncom.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)\ncom.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)\ncom.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4899)\ncom.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3940)\norg.logstash.Event.fromSerializableMap(Event.java:269)\norg.logstash.Event.deserialize(Event.java:551)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.logstash.ackedqueue.Queue.deserialize(Queue.java:750)\norg.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)\norg.logstash.ackedqueue.Batch.<init>(Batch.java:49)\norg.logstash.ackedqueue.Queue.readPageBatch(Queue.java:681)\norg.logstash.ackedqueue.Queue.readBatch(Queue.java:614)\norg.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:158)\norg.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)\norg.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)\norg.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:569)\norg.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:300)\norg.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:164)\norg.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)\norg.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:456)\norg.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:195)\norg.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:346)\norg.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)\norg.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)\norg.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)\norg.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)\norg.jruby.runtime.Block.call(Block.java:144)\norg.jruby.RubyProc.call(RubyProc.java:354)\norg.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)\njava.base/java.lang.Thread.run(Thread.java:840)"}}, :thread=>"#<Thread:0x10df8cf8 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:138 sleep>"}
[2024-11-18T19:13:32,715][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2024-11-18T19:13:32,981][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2024-11-18T19:13:32,986][INFO ][logstash.runner          ] Logstash shut down.

Notably:

exception: "Java::OrgLogstashAckedqueue::QueueRuntimeException"
error:     "deserialize invocation error"
cause:
  exception: "Java::JavaLangReflect::InvocationTargetException"
  error: ""
  cause:
    exception: "Java::ComFasterxmlJacksonCoreExc::StreamConstraintsException"
    error: "String value length (20051112) exceeds the maximum allowed (20000000, from `StreamReadConstraints.getMaxStringLength()`)"

Notably, setting -Dlogstash.jackson.stream-read-constraints.max-string-length=400000000 does NOT change the error message, indicating that the stream constraint is not applied to the specific deserializer that works with the PQ, possibly because the CBOR_MAPPER is configred before the overrides are applied.

@yaauie
Copy link
Member Author

yaauie commented Nov 20, 2024

I can confirm that the static initialization of the CBOR_MAPPER occurs berore the Jackson overrides have been applied, and have a local patch to move the Jackson overrides application code over to pure Java so that they can be deterministically applied first. This patch works, and when applied the above issue can no longer be reproduced.

The existing ruby implementation helpfully logs as it goes (and these logs are covered in specs), but since the ObjectMappers is first loaded (and therefore its components' static initializes fired) before the loggers have been configured, getting the helpful logging and validating that our overrides have taken effect will be tricky.

@yaauie
Copy link
Member Author

yaauie commented Dec 3, 2024

I suspect that our inability to reproduce has to do with the recent reversion of changes to the buffered tokenizer, which had allowed over-large payloads (after reverting, I believe they are truncated), since my repro used stdin, which "conveniently" auto-upgrades from plain to the line codec, which uses the buffered tokenizer.

Automatically switching from plain to line codec {:plugin=>"stdin"}

I'm attempting to reproduce without routing through that.

donoghuc added a commit to donoghuc/logstash that referenced this issue Dec 24, 2024
When Logstash 8.12.0 added increased Jackson stream read constraints to
jvm.options, assumptions about the existence of that file's contents
were invalidated. This led to issues like elastic#16683.

This change ensures Logstash applies defaults from config at runtime:
- MAX_STRING_LENGTH: 200_000_000
- MAX_NUMBER_LENGTH: 10_000
- MAX_NESTING_DEPTH: 1_000

These match the jvm.options defaults and are applied even when config
is missing. Config values still override these defaults when present.
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant