-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReactiveTweets.scala
105 lines (82 loc) · 3.19 KB
/
ReactiveTweets.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package akka.examples.stream
import java.nio.file.Paths
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ClosedShape, IOResult}
import akka.util.ByteString
import scala.concurrent.Future
/**
* Example code from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html#Time-Based_Processing
*/
object ReactiveTweets {
final case class Author(handle: String)
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
val akka = Hashtag("#akka")
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
def tweetStreamFromFile(path: String): Source[Tweet, Future[IOResult]] =
FileIO
.fromPath(Paths.get(path))
.via(Framing.delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 1024, allowTruncation = true))
.map(_.utf8String)
.map(_.split("\t") )
.map {
case Array(a: String, b: String, c: String) => Tweet(Author(a), b.toLong, c)
}
def extractAuthorsFromAkkaTweets(tweets: Source[Tweet, _]) =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
def extractHashTagsFromTweets(tweets: Source[Tweet, _]) =
tweets
.mapConcat(_.hashtags.toList)
def printStream(s: Source[_, _]) = s.runForeach(println)
def writeAuthors = FileIO.toPath(Paths.get("/tmp/authors"))
def writeHashtags = FileIO.toPath(Paths.get("/tmp/hashtags"))
/**
* Example from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html#Broadcasting_a_stream
*
* Hacked to allow writing to files via ByteStrings.
*
* TODO - better way to append line separator?
*/
def broadcastingTweetsExample(tweets: Source[Tweet, _]) = {
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet]
.map(_.author)
.map((a: Author) => ByteString(a.handle + System.lineSeparator())) ~> writeAuthors
bcast.out(1) ~> Flow[Tweet]
.mapConcat(_.hashtags.toList)
.map((h: Hashtag) => ByteString(h.name + System.lineSeparator())) ~> writeHashtags
ClosedShape
})
g.run()
}
/**
* Example from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html#Materialized_values
*
* Obtain count of tweets from the materialized processing pipeline.
*
* tweetCounts illustrates what is happening in more detail than tweetCounts2 which uses a more compact syntax
*/
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
def tweetCounts(tweets: Source[Tweet, _]): Future[Int] = {
val count: Flow[Tweet, Int, NotUsed] = Flow[Tweet].map(_ => 1)
val counterGraph: RunnableGraph[Future[Int]] =
tweets
.via(count)
.toMat(sumSink)(Keep.right)
counterGraph.run()
}
def tweetCounts2(tweets: Source[Tweet, _]): Future[Int] = {
tweets.map(_ => 1).runWith(sumSink)
}
}