Discussion:
[akka-user] how to define materialized value
Arun Sethia
2016-03-06 00:00:21 UTC
Permalink
Hi,

can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams


I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.

- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)



It is quite confusing for me to understand difference between "out" and
"Mat".


Thanks

As
--
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Rafał Krzewski
2016-03-06 00:46:11 UTC
Permalink
Hi,
there are a few ways of doing that. Probably the simplest one is using
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts
the elements that pass through it and makes the current count available
through a "side channel":

trait Counter {
def get: Long
}

def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}

Another way is using a GraphStageWithMaterializedValue while building a
custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like
an ordinary GraphStage, you return a pair of GraphStageLogic and the
materialized value.

Cheers,
Rafał

W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Arun Sethia
2016-03-06 05:25:56 UTC
Permalink
Thanks.

I tried to put example using same:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.left)

val result=runnableGraph.run()

def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}

The result should give me a tuple having count (flow) and sum (Sink) value, since I am using Keep.both,* but it is giving only Future[Int].*

what is difference between materialize value and out?

Thanks
Arun
Post by Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts
the elements that pass through it and makes the current count available
trait Counter {
def get: Long
}
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
Another way is using a GraphStageWithMaterializedValue while building a
custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like
an ordinary GraphStage, you return a pair of GraphStageLogic and the
materialized value.
Cheers,
Rafał
W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
j***@dataqube.de
2018-10-22 08:32:03 UTC
Permalink
Hi Rafal,

I just stumbled upon your reply. Could you explain why you have to wrap the
AtomicLong into the Counter trait? I tried returning the naked AtomicLong
in the 'mapMaterializedValue' function, but the resulting value was always
0. Why does wrapping in a trait produce a different result?

Best,
Jurgis
Post by Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts
the elements that pass through it and makes the current count available
trait Counter {
def get: Long
}
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
Another way is using a GraphStageWithMaterializedValue while building a
custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like
an ordinary GraphStage, you return a pair of GraphStageLogic and the
materialized value.
Cheers,
Rafał
W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Martynas Mickevičius
2018-11-05 06:18:59 UTC
Permalink
Did you send any elements to the stream? internalCounter is incremented in
the map operator for every incoming stream element.

Trait is only used to hide the implementation details of the counter.
Post by j***@dataqube.de
Hi Rafal,
I just stumbled upon your reply. Could you explain why you have to wrap
the AtomicLong into the Counter trait? I tried returning the naked
AtomicLong in the 'mapMaterializedValue' function, but the resulting value
was always 0. Why does wrapping in a trait produce a different result?
Best,
Jurgis
Post by Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts
the elements that pass through it and makes the current count available
trait Counter {
def get: Long
}
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
Another way is using a GraphStageWithMaterializedValue while building a
custom Flow / Sink / Source. Instead of returning a GraphStageLogic,
like an ordinary GraphStage, you return a pair of GraphStageLogic and
the materialized value.
Cheers,
Rafał
W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
http://doc.akka.io/docs/akka/current/additional/faq.html
Post by Rafał Krzewski
Post by Arun Sethia
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Post by j***@dataqube.de
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Martynas Mickevičius
2018-11-05 06:24:25 UTC
Permalink
Here is a quick scalafiddle that runs the code from 2 years ago :)

https://scalafiddle.io/sf/xaGO6Yr/0
Post by Martynas Mickevičius
Did you send any elements to the stream? internalCounter is incremented
in the map operator for every incoming stream element.
Trait is only used to hide the implementation details of the counter.
Post by j***@dataqube.de
Hi Rafal,
I just stumbled upon your reply. Could you explain why you have to wrap
the AtomicLong into the Counter trait? I tried returning the naked
AtomicLong in the 'mapMaterializedValue' function, but the resulting value
was always 0. Why does wrapping in a trait produce a different result?
Best,
Jurgis
Post by Rafał Krzewski
Hi,
there are a few ways of doing that. Probably the simplest one is using
Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts
the elements that pass through it and makes the current count available
trait Counter {
def get: Long
}
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
Another way is using a GraphStageWithMaterializedValue while building a
custom Flow / Sink / Source. Instead of returning a GraphStageLogic,
like an ordinary GraphStage, you return a pair of GraphStageLogic and
the materialized value.
Cheers,
Rafał
W dniu niedziela, 6 marca 2016 01:02:56 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out"
and "Mat".
Thanks
As
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
http://doc.akka.io/docs/akka/current/additional/faq.html
Post by Rafał Krzewski
Post by Arun Sethia
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Post by Martynas Mickevičius
Post by j***@dataqube.de
Post by Rafał Krzewski
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
j***@dataqube.de
2018-11-05 16:35:42 UTC
Permalink
I did send events to the stream, but the counter stayed at 0. Maybe the
error was in a different place in my setup. Thanks for the working code
example!
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Arun Sethia
2016-03-06 04:27:15 UTC
Permalink
Thanks Rafal.

Based on this I tried to make sample code, where I would like to count
number of elements being processed and their sum:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)

val result=runnableGraph.run()


def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}



1. using Keep.both, result should able to return me count and sum, but it is not?

2. How materialize values are different than "out"? I am not able to visualize the difference between materialize values and out?

Thanks
Arun
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Rafał Krzewski
2016-03-06 13:20:43 UTC
Permalink
Arun,

a little correction:

val runnableGraph =
source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)

And subsequently:

val (counter, futureSum) = runnableGraph.run()

Graph outlets are always streams. You need to connect them to a Sink
(through intervening Flows or more complex Graphs, as necessary) in order
to create a RunnableGraph. Materialized values are the other things used to
connect the RunnableGraph to the outside world that are *not* streams.

For example Sink.fold creates a stream element that is (obviously) a Sink.
It does not have any stream outlets. However it provides a materialized
value Future[U] that is completed when the Sink's inlet stream is
exhausted. This is how a running stream can communicate it's successful
completion or failure to the outside world.

Another example is Source.actorPublisher: you provide it with Props for an
Actor that implements ActorPublisher contract. When materializing the
stream, the Source will instantiate the Actor and return it's ActorRef as a
materialized value. The Actor is internal to the stream but you can use the
ActorRef as an interface from the outside world into the stream: send
messages (using your own protocol) to be passed to the Source's outlet,
according to demand from downstream. The tricky part is that such gateway
Actor must manage buffering and/or backpressure on it's own!

Besides that, you can use materialized values to monitor stream execution
from the outside, like in the Counter example above
or https://github.com/akka/akka/pull/19836 or to interrupt a stream that
would otherwise run for a long (or unlimited)
time: https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala

Cheers,
Rafał

W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Thanks Rafal.
Based on this I tried to make sample code, where I would like to count
val source = Source (1 to 5).filter(x=> x%2==0)
val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)
val result=runnableGraph.run()
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
1. using Keep.both, result should able to return me count and sum, but it is not?
2. How materialize values are different than "out"? I am not able to visualize the difference between materialize values and out?
Thanks
Arun
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Arun Sethia
2016-03-06 17:24:15 UTC
Permalink
Awesome thanks a lot.
Post by Arun Sethia
Arun,
val runnableGraph =
source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)
val (counter, futureSum) = runnableGraph.run()
Graph outlets are always streams. You need to connect them to a Sink
(through intervening Flows or more complex Graphs, as necessary) in order
to create a RunnableGraph. Materialized values are the other things used
to connect the RunnableGraph to the outside world that are *not* streams.
For example Sink.fold creates a stream element that is (obviously) a Sink.
It does not have any stream outlets. However it provides a materialized
value Future[U] that is completed when the Sink's inlet stream is
exhausted. This is how a running stream can communicate it's successful
completion or failure to the outside world.
Another example is Source.actorPublisher: you provide it with Props for
an Actor that implements ActorPublisher contract. When materializing the
stream, the Source will instantiate the Actor and return it's ActorRef as
a materialized value. The Actor is internal to the stream but you can use
the ActorRef as an interface from the outside world into the stream: send
messages (using your own protocol) to be passed to the Source's outlet,
according to demand from downstream. The tricky part is that such gateway
Actor must manage buffering and/or backpressure on it's own!
Besides that, you can use materialized values to monitor stream execution
from the outside, like in the Counter example above or
https://github.com/akka/akka/pull/19836 or to interrupt a stream that
https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala
Cheers,
Rafał
W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 uÅŒytkownik Arun Sethia
Post by Arun Sethia
Thanks Rafal.
Based on this I tried to make sample code, where I would like to count
val source = Source (1 to 5).filter(x=> x%2==0)
val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)
val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)
val result=runnableGraph.run()
def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}
1. using Keep.both, result should able to return me count and sum, but it is not?
2. How materialize values are different than "out"? I am not able to visualize the difference between materialize values and out?
Thanks
Arun
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Post by Arun Sethia
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Arun Sethia
2016-03-06 00:31:52 UTC
Permalink
the given same code:

val source = Source (1 to 5).filter(x=> x%2==0)

val flow:Flow[Int,Int,Unit]=Flow[Int].map(x=> x * 2)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(flow).toMat(sink)(Keep.both)

I am not sure what is use of using Keep.both vs Keep.left, I thought If I use keep.both, will able to get values for flow and sink as tuple.
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Arun Sethia
2016-03-06 00:19:08 UTC
Permalink
more

Source[+Out, +Mat],Flow[-In, +Out, +Mat] and Sink[-In, +Mat] , in all cases
what is +Mat type and how I can define one such , if possible any example
will be great.
Post by Arun Sethia
Hi,
can some explain what does it mean of materialized value ? I have see
documentation at
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams
I am not sure how Flow can define materialize type, for example the
following code has Input - Tweet, output - Int but Mat is Unit. I would
like to see how someone can define Mat as Int or any example where Flow or
source is defining Mat other than Unit.
- val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
It is quite confusing for me to understand difference between "out" and
"Mat".
Thanks
As
--
Post by Arun Sethia
Read the docs: http://akka.io/docs/
Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+***@googlegroups.com.
To post to this group, send email to akka-***@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Loading...