Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scala.MatchError: None (of class scala.None$) during CaseClassSerializer #148

Open
chollinger93 opened this issue Sep 12, 2024 · 10 comments
Labels
bug Something isn't working

Comments

@chollinger93
Copy link

Direct follow up from #106, but I figured a new issue would be helpful. Using "1.18.1_1.1.5" w/ scala 3.4.0 on AWS' managed Flink (i.e., little control over their setup) w/ Flink 1.18.

I am seeing scala.MatchError: None (of class scala.None$) issues just like in #106, also having a hard time replicating it locally at all, so I can't test against this, nor run a debugger through it. I think @novakov-alexey 's classpath suspicion is spot on.

My flow is essentially []byte => A (via Kafka) => B (via ProcessFunction), both A and B being case classes w/ Optional values.

The Kafka deserializer works flawlessly.

It seems to fail on the custom mapping step, which is a ProcessFunction[A, B]. The stacktrace points me to a ctx.output(tag, p) w/in the ProcessFunction, where the following happens:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:95)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:93)
	at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
	at (... ctx.output)
...
Caused by: scala.MatchError: None (of class scala.None$)
	at org.apache.flinkx.api.serializer.OptionSerializer.copy(OptionSerializer.scala:50)
	at org.apache.flinkx.api.serializer.OptionSerializer.copy(OptionSerializer.scala:48)
	at org.apache.flinkx.api.serializer.CaseClassSerializer.$anonfun$2(CaseClassSerializer.scala:76)
	at org.apache.flinkx.api.serializer.CaseClassSerializer.$anonfun$adapted$2(CaseClassSerializer.scala:76)
	at scala.collection.immutable.Range.map(Range.scala:59)
	at org.apache.flinkx.api.serializer.CaseClassSerializer.copy(CaseClassSerializer.scala:76)
	at org.apache.flinkx.api.serializer.CaseClassSerializer.copy(CaseClassSerializer.scala:72)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
	... 34 more

The line in question is https://github.com/flink-extended/flink-scala-api/blob/v1.18.1_1.1.5/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala#L50

In the past, my A => B map has failed on something seemingly simple like

case class Foo(bar: Option[Bar])
case class Bar(id: String, baz: Option[String])
// imagine 
val x = Foo(Some(Bar("ID", None))

So I resorted to (essentially):

val x: Foo
val y: Foo = x.baz match 
  case Some(b) => x.copy(bar = Some(b))
  case _ => x.copy(bar = None)

(Note the match on _, rather than matching on None)

Which fixed the issue and provided the downstream processing an actual None value.

Today, dealing with a new message type, I had to deal with a even more nested case class that was partially None. I'll spare you the full example, but doing basically this

val x: Foo
val y: Foo = x.baz match 
  case Some(b) => x.copy(b.couldBeNone match
    case Some(c) => .... //you get the idea

Did work.

Which would support the theory that, unless you manually re-write every None type, something is making it so that None != None. The only way I can "explain" that (using that term liberally) is "something something classpath".

I haven't read through the OptionSerializer code in this repo in it's entirety, but one could argue for pattern matching against _ there too.

If I get the time I'll do more digging, but for now my super-cursed pattern matching against _ works around this (I think?)

@novakov-alexey
Copy link
Collaborator

Hi @chollinger93 , thank you for the detailed investigation and for spotted workaround. As I understood you can reproduce this issue again and again?

I think it is really needs to be tried on the Flink app/job cluster environment. Probably this can't be spotted in Flink Local mode or simply saying within single JVM.

One question: when you run on AWS Flink, do you remove Flink's Scala JAR somehow?
Official recipe is to remove the Scala JAR from flink/lib folder. Not sure if that feasible in AWS.

Another option is to suppress old Flink's Scala is to use this property: classloader.parent-first-patterns.default. Works fine for me so far. Please check here the idea: https://ververica.zendesk.com/hc/en-us/articles/13583223554460-How-to-run-my-Flink-job-with-Scala-2-13-and-Scala-3-version-in-VVP

@chollinger93
Copy link
Author

Yes, this is reproducible, provided you get a message that has a None field that isn't covered by my "workaround".

I don't think you can remove jars from the AWS Flink runners - I think it's all managed K8s pods with no custom init actions.

The link you provided is a good idea. Unfortunately, I don't think the AWS runner gives me access to that property (or anything in the cluster config, outside of raising a support ticket for some of them) and setting classloader.parent-first-patterns.default programmatically isn't really an option.

@novakov-alexey
Copy link
Collaborator

Got it. I will try you code example on one of my session clusters (with and without Flink's scala around).

It would be great if you could verify at AWS whether changing the classloader.parent-first-patterns.default property helps. Yes, AWS requires to raise a ticket for that unfortunatelly.

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Oct 4, 2024

Hi @chollinger93 ,

I've tried to reproduce your scenario with in-memory data generator, but I could not get the error in Scala Option serialiser.
Could you check if this project code is similar to your scenario? https://github.com/novakov-alexey/flink-scala-option
I supress Flink embeeded Scala by removing scala package from this Flink configuration property:

classloader.parent-first-patterns.default
java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback

Another way, if could you give me complete example of a Flink job which I could build and deploy in any Flink-based platform.

@chollinger93
Copy link
Author

Hey, sorry for not responding earlier. I have it on my backlog to try Flink via K8s/locally, but haven't gotten around to it yet. Setting classloader.parent-first-patterns.default via the AWS support seems like too big of an operational burden, assuming we won't have a way to self-service this.

Your example is pretty spot on, assuming FakeProcessFunction runs on a different executor than the source.

My real code at work has the added complexity of using protobuf, but scalapb generates regular case classes, so I don't think that should have any impact.

I can see if I can put together a minimal example soon.

Sorry about the delays on my end - really appreciate you looking into this!

@chollinger93
Copy link
Author

I spent the morning trying to get a minimal example to work (and by "work", I mean "get it to break"), but I'm afraid between us having multiple modules, Kafka, protobuf (and using protobuf to parse from raw []byte), various dependencies, and our build.sbt (that is being a bit too clever about caching), I'm not able to reproduce it in isolation and had to timebox it. I wrote about a bit of that complexity here, in case you're curious.

That being said, I did try your sbt adjustments and I believe the missing piece was ExclusionRule(organization = "org.scalameta").

While poking around the jar, I saw coursierapi/shaded/scala/MatchError.class and others being included. They come from org.scalameta. A diff between 2 assemblies with and without that ExlusionRule yields 2120 classes now excluded.

On a related note, I don't think we can't actually use assemblyPackageScala / assembleArtifact, since AWS' Flink doesn't appear to be including the scala standard lib, so that would yield Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: scala/MatchError. It is within the realms of possibility that our dependencies.scala is messing something up there, but it doesn't seem to be necessary anyways. I don't think I've ever used that rule, so maybe I'm misunderstanding it.

Here are the relevant sections:

def excludeJars(cp: Classpath) = cp filter { f =>
  Set(
    "scala-asm-.+-scala-1\\.jar",
    "interface-.+\\.jar",
    "interface-.+\\.jar",
    "jline-terminal-.+\\.jar",
    "jline-reader-.+\\.jar",
    "jline-.+\\.jar",
    "scala-compiler-.+\\.jar",
    "scala3-compiler_.+\\.jar",
    "flink-shaded-zookeeper-.+\\.jar",
    "flink-shaded-jackson-.+\\.jar",
    "annotations-.+\\.jar"
  ).map(p => f.data.getName.matches(p)).forall(identity)
}

lazy val commonSettings = Seq(
// ...
  assembly / assemblyExcludedJars := {
    val cp = (assembly / fullClasspath).value
    excludeJars(cp)
  }
}
// dependencies.scala
 "org.apache.flink" % "flink-core" % V.flink,
("org.flinkextended" % "flink-scala-api_3" % V.flinkScala)
  .excludeAll(
    ExclusionRule(organization = "org.apache.flink"),
    ExclusionRule(organization = "org.scalameta")
  )
  .exclude("com.thesamet.scalapb", "lenses_2.13")
  .exclude("com.thesamet.scalapb", "scalapb-runtime_2.13"),

If you have a docs page/repo anywhere, I'd be happy to contribute there to make this easier to discover for future AWS users.

@novakov-alexey novakov-alexey added the bug Something isn't working label Oct 14, 2024
@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Oct 14, 2024

  1. Thanks for sharing the blog post. It looks very interesting, I will definitely read it.
  2. Do I understand correctly that the issue goes away if "org.scalameta" package is excluded in assembly phase and issue does not happen anymore for you on AWS?
  3. Indeed, I usually exclude those jars in the Set of excludeJars function to get much smaller assembly JAR, otherwise it is insanely big.
  4. Nice approach to use regexp in the JAR names, I did not know it supports that. Thanks for the hint!
  5. I use assemblyPackageScala / assembleArtifact := false, because I attach Scala standard library manually as additional dependencies, i.e. as separate JAR files to a job classpath.
    Scala libraries:
  • https://mvnrepository.com/artifact/org.scala-lang/scala3-library_3/3.4.1
  • https://mvnrepository.com/artifact/org.scala-lang/scala-library/2.13.13
    It works flawlessly in this way. One more important setting to use this approach is to remove "scala" package from the "classloader.parent-first-patterns.default" Flink property. As for AWS, I think it should work there as well, if you get AWS support to set this property for you. Otherwise, you have to include Scala standard library into your assembly JAR. By the way, Flink 2.x promises to remove Scala 2.12 from it. If it happens, then I think we won't need to change this "classloader.parent-first-patterns.default" property anymore. Thus, support of newer user's Scala will be simpler.
  1. I am of course support an idea to contribute any docs to the repo. What would the main idea of the issue documentation? Just for me to understand the right way to document that.

@chollinger93
Copy link
Author

I'm afraid this caused some premature celebration - after letting this job run in our dev environment for a while, I just got the ping for messages being dropped due to the same error from the original post.

I'm going to have to table the investigation here. Looking at the full rendered dependency graph I don't see anything blatantly wrong with my jar, so I must assume without control over the Flink classpath in the AWS runner, there's nothing else I can do.

@novakov-alexey
Copy link
Collaborator

If you have a chance to try your job with Apache Flink Kubernetes Operator or Ververica Community version that would be another option to confirm whether classpath control is exactly the root cause of the problem

@chollinger93
Copy link
Author

I will try that in the next weeks when I get a chance to set it up on a K8s cluster!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants