Class TestSubscriber.ManualProbe<I>

java.lang.Object
org.apache.pekko.stream.testkit.TestSubscriber.ManualProbe<I>
All Implemented Interfaces:
org.reactivestreams.Subscriber<I>
Direct Known Subclasses:
TestSubscriber.Probe
Enclosing class:
TestSubscriber

public static class TestSubscriber.ManualProbe<I> extends Object implements org.reactivestreams.Subscriber<I>
Implementation of Subscriber that allows various assertions.

All timeouts are dilated automatically, for more details about time dilation refer to pekko.testkit.TestKit.

  • Method Details

    • expectNext

      public TestSubscriber.ManualProbe<I> expectNext(I e1, I e2, Object... es)
      Fluent DSL

      Expect multiple stream elements.

    • expectNextUnordered

      public TestSubscriber.ManualProbe<I> expectNextUnordered(I e1, I e2, Object... es)
      Fluent DSL

      Expect multiple stream elements in arbitrary order.

    • expectSubscription

      public org.reactivestreams.Subscription expectSubscription()
      Expect and return a Subscription.
    • expectEvent

      public TestSubscriber.SubscriberEvent expectEvent()
      Expect and return TestSubscriber.SubscriberEvent (any of: OnSubscribe, OnNext, OnError or OnComplete).
    • expectEvent

      public TestSubscriber.SubscriberEvent expectEvent(scala.concurrent.duration.FiniteDuration max)
      Expect and return TestSubscriber.SubscriberEvent (any of: OnSubscribe, OnNext, OnError or OnComplete).
    • expectEvent

      public TestSubscriber.SubscriberEvent expectEvent(Duration max)
      JAVA API

      Expect and return TestSubscriber.SubscriberEvent (any of: OnSubscribe, OnNext, OnError or OnComplete).

      Since:
      1.1.0
    • expectEvent

      Fluent DSL

      Expect TestSubscriber.SubscriberEvent (any of: OnSubscribe, OnNext, OnError or OnComplete).

    • expectNext

      public I expectNext()
      Expect and return a stream element.
    • expectNext

      public I expectNext(scala.concurrent.duration.FiniteDuration d)
      Expect and return a stream element during specified time or timeout.
    • expectNext

      public I expectNext(Duration d)
      JAVA API

      Expect and return a stream element during specified time or timeout.

      Since:
      1.1.0
    • expectNext

      public TestSubscriber.ManualProbe<I> expectNext(I element)
      Fluent DSL

      Expect a stream element.

    • expectNext

      public TestSubscriber.ManualProbe<I> expectNext(scala.concurrent.duration.FiniteDuration d, I element)
      Fluent DSL

      Expect a stream element during specified time or timeout.

    • expectNext

      public TestSubscriber.ManualProbe<I> expectNext(Duration d, I element)
      JAVA API

      Fluent DSL

      Expect a stream element during specified time or timeout.

      Since:
      1.1.0
    • expectNext

      public TestSubscriber.ManualProbe<I> expectNext(I e1, I e2, scala.collection.immutable.Seq<I> es)
      Fluent DSL

      Expect multiple stream elements.

    • expectNextUnordered

      public TestSubscriber.ManualProbe<I> expectNextUnordered(I e1, I e2, scala.collection.immutable.Seq<I> es)
      Fluent DSL

      Expect multiple stream elements in arbitrary order.

    • expectNextN

      public scala.collection.immutable.Seq<I> expectNextN(long n)
      Expect and return the next n stream elements.
    • expectNextN

      public TestSubscriber.ManualProbe<I> expectNextN(scala.collection.immutable.Seq<I> all)
      Fluent DSL Expect the given elements to be signalled in order.
    • expectNextN

      public TestSubscriber.ManualProbe<I> expectNextN(List<I> elems)
      Fluent DSL Expect the given elements to be signalled in order.
      Since:
      1.1.0
    • expectNextUnorderedN

      public TestSubscriber.ManualProbe<I> expectNextUnorderedN(scala.collection.immutable.Seq<I> all)
      Fluent DSL Expect the given elements to be signalled in any order.
    • expectNextUnorderedN

      public TestSubscriber.ManualProbe<I> expectNextUnorderedN(List<I> all)
      JAVA API

      Fluent DSL Expect the given elements to be signalled in any order.

      Since:
      1.1.0
    • expectComplete

      public TestSubscriber.ManualProbe<I> expectComplete()
      Fluent DSL

      Expect completion.

    • expectError

      public Throwable expectError()
      Expect and return the signalled Throwable.
    • expectError

      public TestSubscriber.ManualProbe<I> expectError(Throwable cause)
      Fluent DSL

      Expect given Throwable.

    • expectSubscriptionAndError

      public Throwable expectSubscriptionAndError()
      Expect subscription to be followed immediately by an error signal.

      By default 1 demand will be signalled in order to wake up a possibly lazy upstream.

      See also {@link #expectSubscriptionAndError(signalDemand:Boolean)* #expectSubscriptionAndError(signalDemand: Boolean)} if no demand should be signalled.

    • expectSubscriptionAndError

      public Throwable expectSubscriptionAndError(boolean signalDemand)
      Expect subscription to be followed immediately by an error signal.

      Depending on the signalDemand parameter demand may be signalled immediately after obtaining the subscription in order to wake up a possibly lazy upstream. You can disable this by setting the signalDemand parameter to false.

      See also {@link #expectSubscriptionAndError()* #expectSubscriptionAndError()}.

    • expectSubscriptionAndError

      public TestSubscriber.ManualProbe<I> expectSubscriptionAndError(Throwable cause)
      Fluent DSL

      Expect subscription followed by immediate stream completion.

      By default 1 demand will be signalled in order to wake up a possibly lazy upstream.

      See also {@link #expectSubscriptionAndError(cause:Throwable,signalDemand:Boolean)* #expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean)} if no demand should be signalled.

    • expectSubscriptionAndError

      public TestSubscriber.ManualProbe<I> expectSubscriptionAndError(Throwable cause, boolean signalDemand)
      Fluent DSL

      Expect subscription followed by immediate stream completion. By default 1 demand will be signalled in order to wake up a possibly lazy upstream

      See also {@link #expectSubscriptionAndError(cause:Throwable)* #expectSubscriptionAndError(cause: Throwable)}.

    • expectSubscriptionAndComplete

      public TestSubscriber.ManualProbe<I> expectSubscriptionAndComplete()
      Fluent DSL

      Expect subscription followed by immediate stream completion. By default 1 demand will be signalled in order to wake up a possibly lazy upstream

      See also {@link #expectSubscriptionAndComplete(signalDemand:Boolean)* #expectSubscriptionAndComplete(signalDemand: Boolean)} if no demand should be signalled.

    • expectSubscriptionAndComplete

      public TestSubscriber.ManualProbe<I> expectSubscriptionAndComplete(boolean signalDemand)
      Fluent DSL

      Expect subscription followed by immediate stream completion.

      Depending on the signalDemand parameter demand may be signalled immediately after obtaining the subscription in order to wake up a possibly lazy upstream. You can disable this by setting the signalDemand parameter to false.

      See also {@link #expectSubscriptionAndComplete()* #expectSubscriptionAndComplete}.

    • expectNextOrError

      public scala.util.Either<Throwable,I> expectNextOrError()
      Fluent DSL

      Expect given next element or error signal, returning whichever was signalled.

    • expectNextOrError

      public scala.util.Either<Throwable,I> expectNextOrError(I element, Throwable cause)
      Fluent DSL Expect given next element or error signal.
    • expectNextOrComplete

      public scala.util.Either<TestSubscriber.OnComplete$,I> expectNextOrComplete()
      Expect next element or stream completion - returning whichever was signalled.
    • expectNextOrComplete

      public TestSubscriber.ManualProbe<I> expectNextOrComplete(I element)
      Fluent DSL

      Expect given next element or stream completion.

    • expectNoMessage

      public TestSubscriber.ManualProbe<I> expectNoMessage(scala.concurrent.duration.FiniteDuration remaining)
      Fluent DSL

      Assert that no message is received for the specified time.

    • expectNoMessage

      public TestSubscriber.ManualProbe<I> expectNoMessage()
      Fluent DSL

      Assert that no message is received for the specified time. Waits for the default period configured as pekko.test.expect-no-message-default. That timeout is scaled using the configuration entry "pekko.test.timefactor".

    • expectNoMessage

      public TestSubscriber.ManualProbe<I> expectNoMessage(Duration remaining)
      Java API: Assert that no message is received for the specified time.
    • expectNextPF

      public <T> T expectNextPF(scala.PartialFunction<Object,T> f)
      Expect a stream element and test it with partial function.
    • expectNextWithTimeoutPF

      public <T> T expectNextWithTimeoutPF(scala.concurrent.duration.Duration max, scala.PartialFunction<Object,T> f)
      Expect a stream element and test it with partial function.

      Parameters:
      max - wait no more than max time, otherwise throw AssertionError
    • expectNextWithTimeoutPF

      public <T> T expectNextWithTimeoutPF(Duration max, scala.PartialFunction<Object,T> f)
      JAVA API

      Expect a stream element and test it with partial function.

      Parameters:
      max - wait no more than max time, otherwise throw AssertionError
      Since:
      1.1.0
    • expectNextChainingPF

      public TestSubscriber.ManualProbe<I> expectNextChainingPF(scala.concurrent.duration.Duration max, scala.PartialFunction<Object,Object> f)
      Expect a stream element during specified time or timeout and test it with partial function.

      Allows chaining probe methods.

      Parameters:
      max - wait no more than max time, otherwise throw AssertionError
    • expectNextChainingPF

      public TestSubscriber.ManualProbe<I> expectNextChainingPF(Duration max, scala.PartialFunction<Object,Object> f)
      JAVA API

      Expect a stream element during specified time or timeout and test it with partial function.

      Allows chaining probe methods.

      Parameters:
      max - wait no more than max time, otherwise throw AssertionError
      Since:
      1.1.0
    • expectNextChainingPF

      public TestSubscriber.ManualProbe<I> expectNextChainingPF(scala.PartialFunction<Object,Object> f)
      Expect a stream element during specified time or timeout and test it with partial function.

      Allows chaining probe methods.

    • expectEventWithTimeoutPF

      public <T> T expectEventWithTimeoutPF(scala.concurrent.duration.Duration max, scala.PartialFunction<TestSubscriber.SubscriberEvent,T> f)
    • expectEventWithTimeoutPF

      public <T> T expectEventWithTimeoutPF(Duration max, scala.PartialFunction<TestSubscriber.SubscriberEvent,T> f)
      JAVA API
      Since:
      1.1.0
    • expectEventPF

      public <T> T expectEventPF(scala.PartialFunction<TestSubscriber.SubscriberEvent,T> f)
    • receiveWhile

      public <T> scala.collection.immutable.Seq<T> receiveWhile(scala.concurrent.duration.Duration max, scala.concurrent.duration.Duration idle, int messages, scala.PartialFunction<TestSubscriber.SubscriberEvent,T> f)
      Receive messages for a given duration or until one does not match a given partial function.
    • receiveWhile

      public <T> List<T> receiveWhile(Duration max, Duration idle, int messages, scala.PartialFunction<TestSubscriber.SubscriberEvent,T> f)
      JAVA API

      Receive messages for a given duration or until one does not match a given partial function.

      Since:
      1.1.0
    • receiveWhile$default$1

      public <T> scala.concurrent.duration.Duration receiveWhile$default$1()
    • receiveWhile$default$2

      public <T> scala.concurrent.duration.Duration receiveWhile$default$2()
    • receiveWhile$default$3

      public <T> int receiveWhile$default$3()
    • receiveWithin

      public scala.collection.immutable.Seq<I> receiveWithin(scala.concurrent.duration.FiniteDuration max, int messages)
      Drains a given number of messages
    • receiveWithin

      public List<I> receiveWithin(Duration max, int messages)
      JAVA API

      Drains a given number of messages

      Since:
      1.1.0
    • receiveWithin$default$2

      public int receiveWithin$default$2()
    • toStrict

      public scala.collection.immutable.Seq<I> toStrict(scala.concurrent.duration.FiniteDuration atMost)
      Attempt to drain the stream into a strict collection (by requesting Long.MaxValue elements).

      '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!'''

    • toStrict

      public List<I> toStrict(Duration atMost)
      JAVA API

      Attempt to drain the stream into a strict collection (by requesting Long.MaxValue elements).

      '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!'''

      Since:
      1.1.0
    • within

      public <T> T within(scala.concurrent.duration.FiniteDuration min, scala.concurrent.duration.FiniteDuration max, scala.Function0<T> f)
      Execute code block while bounding its execution time between min and max. within blocks may be nested. All methods in this trait which take maximum wait times are available in a version which implicitly uses the remaining time governed by the innermost enclosing within block.

      Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "pekko.test.timefactor", while the min Duration is not.

      
       val ret = within(50 millis) {
         test ! "ping"
         expectMsgClass(classOf[String])
       }
       
    • within

      public <T> T within(Duration min, Duration max, Creator<T> creator)
      JAVA API

      Execute code block while bounding its execution time between min and max. within blocks may be nested. All methods in this trait which take maximum wait times are available in a version which implicitly uses the remaining time governed by the innermost enclosing within block.

      Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "pekko.test.timefactor", while the min Duration is not.

      
       val ret = within(Duration.ofMillis(50)) {
         test ! "ping"
         expectMsgClass(classOf[String])
       }
       

      Since:
      1.1.0
    • within

      public <T> T within(scala.concurrent.duration.FiniteDuration max, scala.Function0<T> f)
      Same as calling within(0 seconds, max)(f).
    • within

      public <T> T within(Duration max, Creator<T> creator)
      JAVA API

      Same as calling within(Duration.ofSeconds(0), max)(f).

      Since:
      1.1.0
    • onSubscribe

      public void onSubscribe(org.reactivestreams.Subscription subscription)
      Specified by:
      onSubscribe in interface org.reactivestreams.Subscriber<I>
    • onNext

      public void onNext(I element)
      Specified by:
      onNext in interface org.reactivestreams.Subscriber<I>
    • onComplete

      public void onComplete()
      Specified by:
      onComplete in interface org.reactivestreams.Subscriber<I>
    • onError

      public void onError(Throwable cause)
      Specified by:
      onError in interface org.reactivestreams.Subscriber<I>