Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy not working in tests due to coder inequality #5009

Open
f-loris opened this issue Sep 21, 2023 · 4 comments
Open

GroupBy not working in tests due to coder inequality #5009

f-loris opened this issue Sep 21, 2023 · 4 comments
Labels
bug Something isn't working

Comments

@f-loris
Copy link
Contributor

f-loris commented Sep 21, 2023

While upgrading scio from 0.12 to latest 0.13.3 and Apache Beam from 2.41 to 2.50 I observed an issue that Beam's GroupByKey is not working anymore in tests as it complains about the equality of coders.

This is probably related to this PR apache/beam#22702 that was also mention here in this repo.

The following code shows the error:

import org.apache.beam.sdk.transforms.{GroupByKey, SerializableFunction, WithKeys}

class ScioGroupByKeyTest extends PipelineSpec {
  case class TestRecord(key: String, value: String)

  // fails
  it should "work correctly with beams native group by key" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
          override def apply(input: TestRecord): String = input.key
        }))
        .applyKvTransform(GroupByKey.create[String, TestRecord]())
    }
  }

  // fails
  it should "work correctly with scio's group by" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .groupBy(_.key)
    }
  }

  // works
  it should "work correctly with scio's group by with a simple string value" in {
    runWithContext { sc =>
      sc.parallelize(List("key1"))
        .groupBy(identity)
    }
  }
}

The first two cases fail as there is a record involved.
The exception states:

the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
java.lang.IllegalStateException: the GroupByKey requires its output coder to be KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))) but found KvCoder(StringUtf8Coder,IterableCoder(RecordCoder[com.aeroficial.pipeline.scio.ScioGroupByKeyTest.TestRecord](key -> StringUtf8Coder, value -> StringUtf8Coder))).
	at org.apache.beam.sdk.transforms.GroupByKey.validate(GroupByKey.java:190)

From the exception the coders seem to be identical. While debugging I discovered that it is somehow related to coder materialization as here a materialized coder is compared to a non-materialized value coder:
image

As mentioned this only appears in tests, because when using the DirectRunner it is working as it is comparing non MaterializedCoders.
image

Am I doing here something wrong or is there any workaround? I already tried setting the coder's manually through setCoder.

Thanks for taking a look at this issue.

@RustedBones
Copy link
Contributor

This sounds familiar. Thanks for providing the test cases, will look into it ASAP

@RustedBones RustedBones added the bug Something isn't working label Sep 21, 2023
@RustedBones
Copy link
Contributor

Here are the debugging results:
The 2nd case

  it should "work correctly with scio's group by" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .groupBy(_.key)
    }
  }

Does not fail with the same exception:

Found an $outer field in class com.spotify.scio.ScioGroupByKeyTest$$anonfun$14$$anonfun$apply$5$$anon$5.
Possibly it is an attempt to use inner case class in a Scio transformation. Inner case classes are not supported in Scio auto-derived macros. Move the case class to the package level or define a custom coder.

basically, test classes are not serializable. If you move the TestRecord definition into the test class companion object, it will work as expected.

Concerning the 1st test case

  it should "work correctly with beams native group by key" in {
    runWithContext { sc =>
      sc.parallelize(List(TestRecord("key1", "value1")))
        .applyKvTransform(WithKeys.of(new SerializableFunction[TestRecord, String] {
          override def apply(input: TestRecord): String = input.key
        }))
        .applyKvTransform(GroupByKey.create[String, TestRecord]())
    }
  }

You are right. Since beam 2.42 there is an extra check, where the aggregation steps verifies the value coder to be of type IterableCoder. Unfortunately, this does not play well with scio (scio would not require verification since it is typed checked), and the coder instrumentation (The materialized coder helps debugging failed step with call site info).

The workaround is to use Coder.aggregate, that will change materialization behavior and not wrap the IterableCoder:

 // explicit
.applyKvTransform(GroupByKey.create[String, TestRecord]())(Coder.stringCoder, Coder.aggregate)

// implicit
implicit def groupCoder[T: Coder]: Coder[java.lang.Iterable[T]] = Coder.aggregate[T]
.applyKvTransform(GroupByKey.create[String, TestRecord]())

@f-loris
Copy link
Contributor Author

f-loris commented Sep 24, 2023

Thanks for taking a look and sorry that I mixed up that the second test case failed due to a different error.

I can confirm that the workaround works fine.

I suppose that this issue might appear also for other users that need to work with applyKvTransform as I'm using stateful DoFn's where KV is required. Would there be a way that applyKvTransform handles this automatically or should it by at least documented somewhere?

@RustedBones
Copy link
Contributor

The problem is not the applyKvTransform but the GroupByKey that performs the extra validation on the value coder.
We can add a extra check that if the transform is of type GroupByKey, we use the aggregate coder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants