Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20400][e2e] Migrate test_streaming_sql.sh #24776

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

morazow
Copy link

@morazow morazow commented May 13, 2024

What is the purpose of the change

Migrate test_streaming_sql.sh bash based end-to-end tests to docker based FlinkContainers framework.

Brief change log

  • Add Java ITCase using FlinkContainers for test_streaming_sql.sh logic
  • Implement helper methods in FlinkContainers to obtain job submission results from taskmanagers
  • Remove test_streaming_sql.sh from e2e tests

Verifying this change

  • Integration tests to check the streaming job results
  • Integration and unit tests to validate the helper methods

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@morazow morazow changed the title Mor stream sql itcase [FLINK-20400][e2e] Migrate test_streaming_sql.sh May 13, 2024
@flinkbot
Copy link
Collaborator

flinkbot commented May 13, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@jeyhunkarimov jeyhunkarimov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @morazow thanks for the PR. I quickly went through. Overall looks good to me. I just put some comments. PTAL. Thanks!

swapPlannerScalaWithPlannerLoader();
}

private void swapPlannerLoaderWithPlannerScala() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can invoke this with @BeforeAll instead of doing in constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine. Here I wanted to ensure that it happens before creating flink-containers since it uses the flink folder to create job and task managers. So swap should happen before that.

But I think constructor is after the BeforeAll method, so should be fine, I will check

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added also test that checks correct planner jar is used

private static final String DIST_DIR = System.getProperty("distDir");

@Override
protected FlinkContainers createFlinkContainers() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure that DIST_DIR is not null and is not empty?

.collect(Collectors.toList());
if (files.size() != 1) {
throw new IllegalStateException(
"Found multiple file pattern '" + filePattern + "', expected only one.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can also contain zero patterns no?
Maybe we can improve the error message sth like "Found " + file.size() " patterms, expected only one."?

final BufferedReader bufferedReader = new BufferedReader(new StringReader(input));
final List<String> lines = new ArrayList<>();
String inputLine;
while ((inputLine = bufferedReader.readLine()) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use sth like this to avoid possible leaks?

try (BufferedReader bufferedReader = new BufferedReader(new StringReader(input))) {...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch, addressed


@Test
void testJobManagerWithBaseImage() throws ImageBuildException {
final FlinkContainersSettings settings =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have settings and conf as a instance variables, maybe you can also make two of them (as instance variable) with explicit names?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yes correct, I'd rename the local ones since they are different just for this test case

.build())
.build();

@BeforeAll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a cleanup with @AfterAll ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, since the it tagged with @RegisterExtension, the FlinkContainers is stopped in the afterall callback, so setup files inside the taskmanagers will be purged with container shutdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants