Skip to content

Commit

Permalink
[Python]fix hybird stream test (#66)
Browse files Browse the repository at this point in the history
* fix hybird stream test

* hybirdtest

* make workaround for hybirdstream test

* fix pytest current all of running tests

* check python test exit code

* add java 11 compatible compiler arguments for sun.ni package

* lint
  • Loading branch information
ashione committed Apr 21, 2024
1 parent 23d3a91 commit 6656df7
Show file tree
Hide file tree
Showing 24 changed files with 112 additions and 37 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Build
----------------

Build from source code :

- Build a docker using docker/Dockerfile-env
- Execute `scripts/install.sh`

Expand Down
9 changes: 6 additions & 3 deletions streaming/buildtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ function test_streaming_python()
fi
#python3 -m pytest $script_dir/python/raystreaming/tests/simple --capture=no
bazel build java:streaming_java_pkg
python3 -m pytest "$script_dir"/python/raystreaming/tests/ > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1
zip_and_upload_log "$TMP_LOG_OUTPUT"/python-test/ "${script_dir}/${ZIP_FILE}" "/${GITHUB_SHA}/${TIME}/${ZIP_FILE}"
exit $?
python3 -m pytest "$script_dir"/python/raystreaming/tests/ # > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1
exit_code=$?
echo "Running python test exit code : ${exit_code}"
echo "[Disabled] Uploding output to remote file."
#zip_and_upload_log "$TMP_LOG_OUTPUT"/python-test/ "${script_dir}/${ZIP_FILE}" "/${GITHUB_SHA}/${TIME}/${ZIP_FILE}"
exit $exit_code

popd || exit
}
Expand Down
17 changes: 17 additions & 0 deletions streaming/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

<properties>
<java.version>1.8</java.version>
<java.new.version>11</java.new.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<ray.version>2.9.3</ray.version>
<streaming.version>0.0.1</streaming.version>
Expand Down Expand Up @@ -128,6 +129,22 @@
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.new.version}</source>
<target>${java.new.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
<compilerArgs>
<arg>--add-modules=jdk.unsupported</arg>
<arg>--add-exports=java.base/sun.nio.ch=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static StreamingContext buildContext() {
public void execute(String jobName) {
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName);
JobGraph originalJobGraph = jobGraphBuilder.build();
originalJobGraph.printJobGraph();
this.jobGraph = new JobGraphOptimizer(originalJobGraph).optimize();
jobGraph.printJobGraph();
LOG.info("JobGraph digraph\n{}", jobGraph.generateDigraph());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class PythonPartitionFunction implements Partition<Object> {
public static final PythonPartitionFunction KeyPartition =
new PythonPartitionFunction("raystreaming.partition", "KeyPartition");
public static final PythonPartitionFunction RoundRobinPartition =
new PythonPartitionFunction("raystreaming.partition", "RoundRobinPartitionFunction");
new PythonPartitionFunction("raystreaming.partition", "RoundRobinPartition");
public static final String FORWARD_PARTITION_CLASS = "ForwardPartition";
public static final PythonPartitionFunction ForwardPartition =
new PythonPartitionFunction("raystreaming.partition", FORWARD_PARTITION_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ public int[] partition(T value, int currentIndex, int numPartition) {

@Override
public int[] partition(T record, int numPartition) {
// TODO
return new int[0];
seq = (seq + 1) % numPartition;
partitions[0] = seq;
return partitions;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void setFunction(F function) {

@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
LOG.info("Abstract {}, {} open : {}.", this.getId(), this.getName(), collectorList.size());
this.collectorList = collectorList;
this.runtimeContext = runtimeContext;
if (runtimeContext != null && runtimeContext.getOpConfig() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ChainedOperator(
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
// Dont' call super.open() as we `open` every operator separately.
LOG.info("chainedOperator open.");
LOG.info("ChainedOperator open.");
for (int i = 0; i < operators.size(); i++) {
StreamOperator operator = operators.get(i);
List<Collector> succeedingCollectors = new ArrayList<>();
Expand All @@ -77,6 +77,14 @@ public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
(collector.getId() == operator.getId()
&& collector.getDownStreamOpId() == subOperator.getId()))
.collect(Collectors.toList()));
// FIXME(lingxuan.zlx): Workaround for edge mismatch, see more detail from
// https://github.com/ray-project/mobius/issues/67.
if (succeedingCollectors.isEmpty()) {
succeedingCollectors.addAll(
collectorList.stream()
.filter(x -> (x.getDownStreamOpId() == subOperator.getId()))
.collect(Collectors.toList()));
}
}
});
operator.open(succeedingCollectors, createRuntimeContext(runtimeContext, i));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.ray.streaming.runtime.util;
package io.ray.streaming.common.utils;

import com.google.common.base.Preconditions;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.collect.Sets;
import com.sun.jna.NativeLibrary;
import io.ray.runtime.util.BinaryFileUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public class OutputCollector implements Collector<Record> {

private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);

/** Collector id belongs to source id of edge. */
private final Integer collectorId;

/** DownStream id belongs to target id of edge. */
private final Integer downStreamId;

private final DataWriter writer;
private final ChannelId[] outputQueues;
private final Collection<BaseActorHandle> targetActors;
Expand All @@ -29,10 +35,14 @@ public class OutputCollector implements Collector<Record> {
private final Serializer crossLangSerializer = new CrossLangSerializer();

public OutputCollector(
Integer collectorId,
Integer downStreamId,
DataWriter writer,
Collection<String> outputChannelIds,
Collection<BaseActorHandle> targetActors,
Partition partition) {
this.collectorId = collectorId;
this.downStreamId = downStreamId;
this.writer = writer;
this.outputQueues = outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new);
this.targetActors = targetActors;
Expand All @@ -41,7 +51,7 @@ public OutputCollector(
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA)
.toArray(Language[]::new);
this.partition = partition;
LOGGER.debug(
LOGGER.info(
"OutputCollector constructed, outputChannelIds:{}, partition:{}.",
outputChannelIds,
this.partition);
Expand Down Expand Up @@ -78,4 +88,14 @@ public void collect(Record record) {
}
}
}

@Override
public int getId() {
return collectorId;
}

@Override
public int getDownStreamOpId() {
return downStreamId;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.ray.streaming.runtime.transfer;

import io.ray.runtime.util.BinaryFileUtil;
import io.ray.runtime.util.JniUtils;
import io.ray.streaming.common.utils.JniUtils;

/**
* TransferHandler is used for handle direct call based data transfer between workers.
Expand All @@ -10,8 +9,8 @@
public class TransferHandler {

static {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
io.ray.streaming.common.utils.JniUtils.loadLibrary("streaming_java");
JniUtils.loadLibrary(io.ray.runtime.util.BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
JniUtils.loadLibrary("streaming_java");
}

private long writerClientNative;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class JobWorker implements Serializable {
private static final byte[] NOT_READY_FLAG = new byte[4];

static {
EnvUtil.loadNativeLibraries();
// EnvUtil.loadNativeLibraries();
}

/** JobWorker runtime context state. Used for creating stateful operator like reduce operator. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.common.tuple.Tuple2;
import io.ray.streaming.runtime.config.worker.WorkerInternalConfig;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.core.checkpoint.OperatorCheckpointInfo;
Expand Down Expand Up @@ -190,8 +191,15 @@ private void openProcessor() {
Map<String, List<String>> opGroupedChannelId = new HashMap<>();
Map<String, List<BaseActorHandle>> opGroupedActor = new HashMap<>();
Map<String, Partition> opPartitionMap = new HashMap<>();
Map<String, Tuple2<Integer, Integer>> opIdAndDownStreamIdMap = new HashMap<>();
for (int i = 0; i < outputEdges.size(); ++i) {
ExecutionEdge edge = outputEdges.get(i);
LOG.info(
"Upstream {} {}, downstream {} {}.",
edge.getSource().getExecutionVertexName(),
edge.getSource().getOperator().getId(),
edge.getTargetExecutionJobVertexName(),
edge.getTarget().getOperator().getId());
String opName = edge.getTargetExecutionJobVertexName();
if (!opPartitionMap.containsKey(opName)) {
opGroupedChannelId.put(opName, new ArrayList<>());
Expand All @@ -202,13 +210,19 @@ private void openProcessor() {
.get(opName)
.add(new ArrayList<>(executionVertex.getChannelIdOutputActorMap().values()).get(i));
opPartitionMap.put(opName, edge.getPartition());
opIdAndDownStreamIdMap.put(
opName,
Tuple2.of(
edge.getSource().getOperator().getId(), edge.getTarget().getOperator().getId()));
}
opPartitionMap
.keySet()
.forEach(
opName -> {
collectors.add(
new OutputCollector(
opIdAndDownStreamIdMap.get(opName).f0,
opIdAndDownStreamIdMap.get(opName).f1,
writer,
opGroupedChannelId.get(opName),
opGroupedActor.get(opName),
Expand All @@ -217,7 +231,10 @@ private void openProcessor() {

RuntimeContext runtimeContext =
new StreamingTaskRuntimeContext(executionVertex, lastCheckpointId);

for (Collector collector : collectors) {
LOG.info(
"Collector id {}, downstream id {}.", collector.getId(), collector.getDownStreamOpId());
}
processor.open(collectors, runtimeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void testHybridDataStream() throws Exception {
streamSource
.map(x -> x + x)
.asPythonStream()
.map("ray.streaming.tests.test_hybrid_stream", "map_func1")
.filter("ray.streaming.tests.test_hybrid_stream", "filter_func1")
.map("raystreaming.tests.test_hybrid_stream", "map_func1")
.filter("raystreaming.tests.test_hybrid_stream", "filter_func1")
.asJavaStream()
.sink(
(SinkFunction<Object>)
Expand Down
8 changes: 5 additions & 3 deletions streaming/python/raystreaming/examples/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import ray
import wikipedia
from ray.streaming import StreamingContext
from ray.streaming.config import Config

# from ray.streaming import StreamingContext
from raystreaming import StreamingContext
from raystreaming.config import Config

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -75,7 +77,7 @@ def splitter(line):
.build()
)
# A Ray streaming environment with the default configuration
ctx.set_parallelism(1) # Each operator will be executed by two actors
# ctx.set_parallelism(1) # Each operator will be executed by two actors

# Reads articles from wikipedia, splits them in words,
# shuffles words, and counts the occurrences of each word.
Expand Down
4 changes: 2 additions & 2 deletions streaming/python/raystreaming/tests/simple/test_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ray.streaming import function
from ray.streaming.runtime import gateway_client
from raystreaming import function
from raystreaming.runtime import gateway_client


def test_get_simple_function_class():
Expand Down
8 changes: 4 additions & 4 deletions streaming/python/raystreaming/tests/simple/test_operator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ray.streaming import function
from ray.streaming import operator
from ray.streaming.operator import OperatorType
from ray.streaming.runtime import gateway_client
from raystreaming import function
from raystreaming import operator
from raystreaming.operator import OperatorType
from raystreaming.runtime import gateway_client


def test_create_operator_with_func():
Expand Down
8 changes: 4 additions & 4 deletions streaming/python/raystreaming/tests/test_direct_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import time

import ray
import ray.streaming._streaming as _streaming
import ray.streaming.runtime.transfer as transfer
from raystreaming import _streaming
import raystreaming.runtime.transfer as transfer
from ray._raylet import PythonFunctionDescriptor
from ray.streaming.config import Config
from raystreaming.config import Config
import pytest


Expand Down Expand Up @@ -102,7 +102,7 @@ def on_writer_message_sync(self, buffer: bytes):
return result.to_pybytes()


@pytest.mark.skip(reason="Waitting to fix")
# @pytest.mark.skip(reason="Waitting to fix")
def test_queue():
ray.init()
writer = Worker._remote()
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/raystreaming/tests/test_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import ray
import pytest
from ray.streaming import StreamingContext
from raystreaming import StreamingContext


@pytest.mark.skip(reason="Current log output in console, we can not capture from logs")
Expand Down
4 changes: 2 additions & 2 deletions streaming/python/raystreaming/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys

import ray
from ray.streaming import StreamingContext
from raystreaming import StreamingContext
from ray._private.test_utils import wait_for_condition
import pytest

Expand Down Expand Up @@ -32,7 +32,7 @@ def test_hybrid_stream():
)
current_dir = os.path.abspath(os.path.dirname(__file__))
jar_path = os.path.join(
current_dir, "../../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar"
current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar"
)
jar_path = os.path.abspath(jar_path)
print("jar_path", jar_path)
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/raystreaming/tests/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys

import ray
from ray.streaming import StreamingContext
from raystreaming import StreamingContext


def test_data_stream():
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/raystreaming/tests/test_union_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys

import ray
from ray.streaming import StreamingContext
from raystreaming import StreamingContext


def test_union_stream():
Expand Down
Loading

0 comments on commit 6656df7

Please sign in to comment.