Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1078] Streaming with Monix's Observable #1112

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Avasil
Copy link

@Avasil Avasil commented May 15, 2019

Related to #1078

Observable doesn't have the right kind but I think I managed to find decent workaround using

type ObservableF[F[_], A] = Observable[A]

// coproducts were inferring `Observable[A]` instead of alias
implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit
  tr: ToResponse.Aux[F, ObservableF[F, A], CT]
): ToResponse.Aux[F, Observable[A], CT] = tr

Observable is built with Task internally and depends on Scheduler so it will probably stay this way. On the other hand, it provides many methods that can work with polymorphic effects.

It's still Task under the hood so it's not that nice but if you provide Scheduler you can use other F and I'm pretty sure it would still be more performant than alternatives.

Do you think this approach will work correctly?
Despite some flaws, it would be fantastic to have server-side support for streaming with Monix Observable for its users. :)

@codecov-io
Copy link

Codecov Report

Merging #1112 into master will decrease coverage by 2.19%.
The diff coverage is 34.61%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #1112     +/-   ##
=========================================
- Coverage   80.25%   78.05%   -2.2%     
=========================================
  Files          54       56      +2     
  Lines        1028     1080     +52     
  Branches       47       48      +1     
=========================================
+ Hits          825      843     +18     
- Misses        203      237     +34
Impacted Files Coverage Δ
examples/src/main/scala/io/finch/monix/Main.scala 0% <0%> (ø)
circe/src/main/scala/io/finch/circe/Decoders.scala 70% <0%> (-30%) ⬇️
monix/src/main/scala/io/finch/monix/package.scala 62.06% <62.06%> (ø)
core/src/main/scala/io/finch/Endpoint.scala 72.57% <0%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 32c6ac5...eb7f264. Read the comment docs.

@sergeykolbasov
Copy link
Collaborator

Thanks, @Avasil!

There is also PR #1098 built on top of Iterant. I wonder if we can unite them under the same module.

@Avasil
Copy link
Author

Avasil commented May 16, 2019

@sergeykolbasov I don't see anything against it (the module will add monix-tail dependency but it's not that big)

I've noticed I missed test for Circe module, I'll add it tonight

@Avasil
Copy link
Author

Avasil commented May 17, 2019

@sergeykolbasov I have problem with CirceSpec - I added test for Observable but it fails with following error:

[info] CirceSpec:
Reporter completed abruptly with an exception after receiving event: TestFailed(Ordinal(0, 3),GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
 (AbstractJsonSpec.scala:41)
  Falsified after 1 successful property evaluations.
  Location: (AbstractJsonSpec.scala:41)
  Occurred when passed generated values (
    arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
    arg1 = UTF-32
  )
  Label of failing property:
    Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List(),CirceSpec,io.finch.circe.test.CirceSpec,Some(io.finch.circe.test.CirceSpec),should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,Vector(),Some(org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException: GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
 (AbstractJsonSpec.scala:41)
  Falsified after 1 successful property evaluations.
  Location: (AbstractJsonSpec.scala:41)
  Occurred when passed generated values (
    arg0 = List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))"), // 3 shrinks
    arg1 = UTF-32
  )
  Label of failing property:
    Expected: List("ExampleNestedCaseClass(xqIxgbG,4.3349875045684245E-243,-1,List(-2147483648),ExampleCaseClass(anyooanbgz,-2147483648,false))")
Received: List()),Some(790),Some(IndentedText(- should monix-circe.enumerate.success.ExampleNestedCaseClass,should monix-circe.enumerate.success.ExampleNestedCaseClass,0)),Some(SeeStackDepthException),Some(io.finch.circe.test.CirceSpec),None,pool-1-thread-1-ScalaTest-running-CirceSpec,1558125731073).
java.io.NotSerializableException: sun.nio.cs.UTF_32
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:473)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.scalatest.tools.SocketReporter.apply(SocketReporter.scala:31)
        at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10(DispatchReporter.scala:249)
        at org.scalatest.DispatchReporter$Propagator.$anonfun$run$10$adapted(DispatchReporter.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:388)
        at org.scalatest.DispatchReporter$Propagator.run(DispatchReporter.scala:248)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-13" java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: sun.nio.cs.UTF_32
[info] - should monix-circe.enumerate.success.ExampleNestedCaseClass *** FAILED ***

Any tips on what to look for? I assume I violate some kind of protocol

@joroKr21
Copy link
Collaborator

I think this would have to wait for CE3 support in Monix: monix/monix#1533

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants