# Stream processing

## 1. Message queue

In this section we will introduce Apache Kafka ( https://kafka.apache.org/ ). Kafka is a distributed message broker were you can subscribe for a topic and receive a stream of messages. You will also see how to publish messages. Let's start with the initial setup.

You will be using the pykafka library ( http://pykafka.readthedocs.io/en/latest/ ). It should be already installed on the ICCluster, but you may need to install it locally if you work on your own computer. 

In [1]:
from pykafka import KafkaClient
import getpass

ZOOKEEPER_QUORUM = 'iccluster042.iccluster.epfl.ch:2181,iccluster054.iccluster.epfl.ch:2181,iccluster078.iccluster.epfl.ch:2181'

username = getpass.getuser()

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

**1.a** Look at the pykafka documentation and list the topics currently available:

In [2]:
#TODO: print the topics
client.topics

{b'0': <pykafka.topic.Topic at 0x7f89d56e9588 (name=b'0')>,
 b'ATLAS_ENTITIES': <pykafka.topic.Topic at 0x7f89e44a7ef0 (name=b'ATLAS_ENTITIES')>,
 b'ATLAS_HOOK': <pykafka.topic.Topic at 0x7f89d57214e0 (name=b'ATLAS_HOOK')>,
 b'NSS': <pykafka.topic.Topic at 0x7f89d57287f0 (name=b'NSS')>,
 b'ambari_kafka_service_check': <pykafka.topic.Topic at 0x7f89d407be10 (name=b'ambari_kafka_service_check')>,
 b'name': <pykafka.topic.Topic at 0x7f89d407b9b0 (name=b'name')>,
 b'ndovloketnl-arrivals': <pykafka.topic.Topic at 0x7f89d5728048 (name=b'ndovloketnl-arrivals')>,
 b'ndovloketnl-departures': <pykafka.topic.Topic at 0x7f89d4082f28 (name=b'ndovloketnl-departures')>,
 b'ndovloketnl-gps': <pykafka.topic.Topic at 0x7f89d570fdd8 (name=b'ndovloketnl-gps')>,
 b'test-ndovloketnl-arrivals': <pykafka.topic.Topic at 0x7f89d4082470 (name=b'test-ndovloketnl-arrivals')>,
 b'test-ndovloketnl-departures': <pykafka.topic.Topic at 0x7f89d407b0b8 (name=b'test-ndovloketnl-departures')>,
 b'test-ndovloketnl-gps': <p

As you can see there are already some topics there and unfortunately no separation per user. So, be respectful with the work of your colleagues and with the infrastructure we put in place. More specifically, never write in a topic that you didn't create yourself or your teammate (if not expressly asked to). Be also carefull not to create infinite loops by publishing in the same topic you subscribed, without filtering or exit conditions.

**1.b** That being said, let's create our own topic. Don't forget to make it unique with your username.

In [3]:
name = ('test-topic-' + username).encode()
tt = client.topics[name]

Kafka is configured to autocreate topics if they don't exist a priori. You can check in the list of topics, it should be there now. 

**1.c** To write a message to the topic you need to instanciate a Producer. For simplicity and as we won't deal with very large (or very fast) data, you can use a synchronous publisher. Then, send a series of 10 different messages (for example "Hello world {i}!" with i from 1 to 10). You might need to encode the string to utf-8 before sending:

In [4]:
with tt.get_sync_producer() as producer:
    for i in range(10):
        producer.produce("Hello world {}!".format(i).encode())

**1.d** Now let's check what is in our topic and subscribe to it. Again, for simplicity reason, you can use the basic ```get_simple_consumer```. Notice that the reading loop never finishes, streams are infinite! You must interrupt it manually. You may want to catch the KeyboardInterrupt to avoid polluting the output with a stacktrace:

In [5]:
consumer = tt.get_simple_consumer()
try:
    for message in consumer:
        if message is not None:
            print(message.offset, message.value)
except KeyboardInterrupt:
    print("Consumer stopped.")
    pass

0 b'Hello world 0!'
1 b'Hello world 1!'
2 b'Hello world 2!'
3 b'Hello world 3!'
4 b'Hello world 4!'
5 b'Hello world 5!'
6 b'Hello world 6!'
7 b'Hello world 7!'
8 b'Hello world 8!'
9 b'Hello world 9!'
Consumer stopped.


If you run the previous cell several times you will notice that it always restarts from the beginning by default. We have configured Kafka to persist the data for a quite long time. So be also careful not to leave data producers running uselessly.

This is useful for testing and re-running the same pipeline several time. But in production setting, you may want to remember the last message seen and continue from that. Luckily, Kafka can do it for you if you define a consumer group and commit the current offset (see the documentation: http://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns ). You can either set the auto_commit_enable, or manually call the commit_offsets method.

**1.e** Rewrite the above consummer loop to only show the newest elements every-time it is run. You can modify and re-run the producing loop to add some more messages.

In [6]:
# Alternate between this cell and producer cell [4], multiple times.
# Repeat with other values of OffsetType, i.e. OffsetType.LATEST
from pykafka.common import OffsetType

# Create a consumer with its offset set to the oldest undread message 
consumer = tt.get_simple_consumer(consumer_group=('test-group-' + username).encode(), auto_offset_reset=OffsetType.EARLIEST, auto_commit_enable=True, reset_offset_on_start=False, auto_commit_interval_ms=2000)

# Consume and print the messages.
# You must run the "Hello world" to add some more messages each time you execute this code
try:
    for message in consumer:
        if message is not None:
            print(message.offset, message.value)
except KeyboardInterrupt:
    print("Consumer stopped.")
    pass

0 b'Hello world 0!'
1 b'Hello world 1!'
2 b'Hello world 2!'
3 b'Hello world 3!'
4 b'Hello world 4!'
5 b'Hello world 5!'
6 b'Hello world 6!'
7 b'Hello world 7!'
8 b'Hello world 8!'
9 b'Hello world 9!'
Consumer stopped.


We will now generate a little bit more data and see how to make a live plot. We can use another topic, like `b"test-random-<insert you username>"`.

**1.f** First write an infinite generator (or just a for loop) that outputs the 2D coordinates of a random walk (here you can simply take a random walk on a grid, by taking a random jump of length 1 in any of the 4 directions at each step). Don't forget to add a `time.sleep(1)` each time after you publish it to Kafka. Run the producer for a few seconds, then stop it.

In [7]:
tr = client.topics[('test-random-' + username).encode()]

from random import choice
import time

def random_walk(start):
    directions = [(0,1), (0, -1), (1, 0), (-1, 0)]
    position = start
    while True:
        c = choice(directions)
        yield position
        position = (position[0] + c[0], position[1] + c[1])
        

with tr.get_sync_producer() as producer:
    try:
        n=0
        for p in random_walk((0, 0)):
            producer.produce("{},{}".format(*p).encode())
            n=n+1
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("Producer stopped after {0} messages".format(n))
        pass

Producer stopped after 172 messages


**1.g** Now we will use the library bokeh ( https://bokeh.pydata.org/en/latest/docs/user_guide.html ) for plotting. It allows for live update of the graph on the notebook, pushing data from the kernel. First you need to create a line plot with a ColumnDataSource initialized with empty lists for x and y. Then, in the consuming loop, you will parse the message and update the data of the source. Make sure your code would work for a long running job without eating all the memory ;-)

In [8]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Range1d
import time

output_notebook()

x=[]
y=[]
source = ColumnDataSource(data=dict(x=x, y=y))

p = figure(plot_width=850,plot_height=450)
p.line('x', 'y', source=source)

p.x_range = Range1d(-50, 50)
p.y_range = Range1d(-50, 50)

handle=show(p, notebook_handle=True)

consumer = client.topics[('test-random-' + username).encode()].get_simple_consumer(consumer_group=('test-group-' + username).encode(), auto_offset_reset=OffsetType.EARLIEST, auto_commit_enable=True, reset_offset_on_start=False, auto_commit_interval_ms=2000)

try:
    for message in consumer:
        if message is not None:
            
            xx, yy = message.value.decode().split(",")
            x.append(xx)
            y.append(yy)
            # keep the length constant
            if len(x) > 20:
                x.pop(0)
                y.pop(0)
            
            source.data = dict(x=x, y=y)
            push_notebook(handle=handle)
            time.sleep(0.1)
except KeyboardInterrupt:
    print("Consumer stopped.")
    pass

Consumer stopped.


Note that in a notebook you can only run one cell at the time. You thus must stop the producer before you can run the consumer. If you want to play with the producer and the consumer at the same time, feel free to copy one of them to another notebook or in a python script to be able to run both of them simultaneously. Do not forget to include the cells needed initialize the kafka client as well. If you are working with a teammate (e.g. one is producer and the other is the consumer), make sure that you agree on using the same topic name.

## 2. Spark streaming

In the last module, you have experimented with Spark, processing batches of data in RDDs and dataframes. Spark also supports "streaming", namely it defines a stream as a sequence of mini-batches. You can find a pretty detailed introduction here: https://spark.apache.org/docs/2.3.2/streaming-programming-guide.html. Pay particular attention to the section [_points to remember_](https://spark.apache.org/docs/2.3.2/streaming-programming-guide.html#points-to-remember).

In [9]:
import pyspark
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'

conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('streaming-{0}'.format(username))
conf.set('spark.executor.memory', '1g')
conf.set('spark.executor.instances', '1')
conf.set('spark.executor.cores', '2') # must be greater than the number of sources
conf.set('spark.port.maxRetries', '100')
#conf.set('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2')
conf.set('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [10]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a StreamingContext with two working thread and batch interval of 2 second.
# Each time you stop a StreamingContext, you will need to recreate it here.
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/checkpoint/'.format(username))

You can open the Spark UI, following the link in the output above and you should notice a new tab (only when a stream is running), showing the processing of each mini-batch.

### 2.1 From a RDD queue

We will start with a very simple stream, out of a list of RDDs generated manually. Here is an example of RDDs with a quite simple counter. 

```
rdds = []
for i in range(20):
    rdds.append(sc.range(i * 100, (i + 1) * 100))

```

**2.1.a** In the same vein, generate a list of RDDs with random numbers with a uniform distribution. (hint: have a look at the [spark MLlib package](https://spark.apache.org/docs/2.3.2/api/python/pyspark.mllib.html))

In [11]:
from pyspark.mllib.random import RandomRDDs
rdds = []

# TODO: create the RDD of random values in range [0,1)
for i in range(20):
    rdds.append(RandomRDDs.uniformRDD(sc, 1))

Now let's create the stream and print it from the stream processor. The python API for spark streaming has only a few options to get your results back. `pprint`is the most convenient for debugging and testing, `saveAsTextFiles` can be used for persisting the output and the most generic one is `foreachRDD` which can execute any function. ( https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams )

Once you start the stream, you can check on the UI what is happening and you should see the output here after each batch. You will need to explicitely call stop, which requires you to re-instanciate the streaming context if you want to start a new stream, or re-start this one.

**2.1.b** Run the stream and don't forget to stop it after a few seconds. But do not stop the Spark context yet.

In [12]:
dstream = ssc.queueStream(rdds)

# Print the first ten elements of each RDD generated in this DStream to the console
dstream.pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-07 18:53:46
-------------------------------------------
0.16020278464960847

-------------------------------------------
Time: 2019-05-07 18:53:48
-------------------------------------------
0.020139977846114543

-------------------------------------------
Time: 2019-05-07 18:53:50
-------------------------------------------
0.7264532416878353



In [13]:
ssc.stop(stopSparkContext=False)

**Note** that the streams output a value at 2 seconds interval. This interval corresponds to the length of a micro-batch. It is configurable when you create the StreamContext.

Now let's get back to our random walk example. This demonstrate the capacity of Spark streaming to maintain a current state and apply updates to it. The state is backuped in your user folder on hdfs (as set while creating the streaming context) and you could pass this when recreating the context to re-start from the point when you stopped it.

Do not forget to stop your StreamContext (with `stopSparkContext=False`) when you are done. And remember to recreate the StreamContext after you stop it before reusing it.

**2.1.c** Apply some transformations on the stream of random data such that it outputs 2D coordinates of a random walk.

In [14]:
# Recreate the streaming context, because it has been stopped
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/checkpoint/'.format(username))

In [15]:
import math
directions = [(0,1), (0, -1), (1, 0), (-1, 0)]

def walk(rnd, pos):
    if pos is None:
        pos = (0, 0)
    if rnd:
        d = directions[math.floor(rnd[0] * 4)]
        return (pos[0] + d[0], pos[1] + d[1])
    else:
        return None

In [16]:
dstream = ssc.queueStream(rdds)
dstream = dstream.map(lambda x: (1, x)).updateStateByKey(walk)

**2.1.d** Modify the following output function such that it produces data that can then by plotted by the same function you wrote before in the previous section.

In [17]:
from pykafka import KafkaClient

def sendToKafka(iter):
    client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)
    with client.topics[('test-random-' + username).encode()].get_sync_producer() as producer:
        for record in iter:
            producer.produce("{},{}".format(*record[1]).encode())

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendToKafka))

In [18]:
#dstream.map(lambda x: len(x[1])).foreachRDD(lambda rdd: rdd.foreachPartition(sendToKafka))

**2.1.e** Now you can start it and while it is running launch the plot cell too. Note that it will stop producing events after all the elements of rdds have been processed. You can stop the StreamContext at that point.

In [19]:
ssc.start()  # Start the computation

You can run the same code as before for plotting the random graph. We copy it below for illustration purpose to maintain the chronological order of the exercises in this notebook, but you can run it from the previous cell.

In [20]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Range1d
import time

output_notebook()

x=[]
y=[]
source = ColumnDataSource(data=dict(x=x, y=y))

p = figure(plot_width=850,plot_height=450)
p.line('x', 'y', source=source)

p.x_range = Range1d(-50, 50)
p.y_range = Range1d(-50, 50)

handle=show(p, notebook_handle=True)

consumer = client.topics[('test-random-' + username).encode()].get_simple_consumer(consumer_group=('test-group-' + username).encode(), auto_offset_reset=OffsetType.EARLIEST, auto_commit_enable=True, reset_offset_on_start=False, auto_commit_interval_ms=2000)

try:
    for message in consumer:
        if message is not None:
            
            xx, yy = message.value.decode().split(",")
            x.append(xx)
            y.append(yy)
            # keep the length constant
            if len(x) > 20:
                x.pop(0)
                y.pop(0)
            
            source.data = dict(x=x, y=y)
            push_notebook(handle=handle)
            time.sleep(0.1)
except KeyboardInterrupt:
    print("Consumer stopped.")
    pass

Consumer stopped.


In [21]:
ssc.stop(stopSparkContext=False) # Don't forget to stop the stream when you are done

### 2.2 From Kafka

In [None]:
# Run this if you need to start from fresh
sc.stop()
sc = pyspark.SparkContext.getOrCreate(conf)
sc

Here, we will consume message from the `wiki-edits` topic. It contains the stream of events fetched from the wikimedia sites.
More info: https://www.mediawiki.org/wiki/API:Recent_changes_stream



In [22]:
from pyspark.streaming.kafka import KafkaUtils

In [23]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)

In [24]:
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group-' + username, {'wiki-edits': 1})

In [25]:
dstream.pprint()

In [26]:
ssc.start()

-------------------------------------------
Time: 2019-05-07 18:57:10
-------------------------------------------
(None, '{"bot":false,"comment":"redirect","id":1151956151,"length":{"new":35807,"old":35782},"meta":{"domain":"en.wikipedia.org","dt":"2019-05-07T15:06:10+00:00","id":"a6ca4af2-70d9-11e9-b9b9-1866da992e4e","request_id":"XNGe4QpAICsAAFzIajEAAAAF","schema_uri":"mediawiki/recentchange/2","topic":"eqiad.mediawiki.recentchange","uri":"https://en.wikipedia.org/wiki/2013_FIBA_Africa_Clubs_Champions_Cup_squads","partition":0,"offset":1577206706},"minor":false,"namespace":0,"parsedcomment":"redirect","revision":{"new":895958978,"old":866915590},"server_name":"en.wikipedia.org","server_script_path":"/w","server_url":"https://en.wikipedia.org","timestamp":1557241570,"title":"2013 FIBA Africa Clubs Champions Cup squads","type":"edit","user":"Ortizesp","wiki":"enwiki"}\n')
(None, '{"bot":false,"comment":"Reverted edits by [[Special:Contribs/Burmasips|Burmasips]] ([[User talk:Burmasips|t

In [27]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 18:57:20
-------------------------------------------
(None, '{"bot":false,"comment":"/* wbsetlabel-add:1|sq */ Samuel Ruiz Fuertes, #quickstatements; [[:toollabs:quickstatements/#/batch/12444|batch #12444]] by [[User:Liridon|]]","id":972636986,"length":{"new":10445,"old":10367},"meta":{"domain":"www.wikidata.org","dt":"2019-05-07T16:57:08+00:00","id":"26abf0b4-70e9-11e9-877f-b083fecf0da3","request_id":"XNG45ApAMEAAAK344okAAABT","schema_uri":"mediawiki/recentchange/2","topic":"eqiad.mediawiki.recentchange","uri":"https://www.wikidata.org/wiki/Q6118196","partition":0,"offset":1577365970},"minor":false,"namespace":0,"parsedcomment":"\u200e<span dir=\\"auto\\"><span class=\\"autocomment\\">Shtoi etiketën [sq]: </span></span> Samuel Ruiz Fuertes, #quickstatements; <a href=\\"https://tools.wmflabs.org/quickstatements/#.2Fbatch.2F12444\\" class=\\"extiw\\" title=\\"toollabs:quickstatements/\\">batch #12444</a> by <a href=\\"/wiki/Us

-------------------------------------------
Time: 2019-05-07 18:57:30
-------------------------------------------

-------------------------------------------
Time: 2019-05-07 18:57:40
-------------------------------------------



Prototyping spark transformations on a stream can be cumbersome, so we can also create a static RDD from Kafka to experiment first.

In [28]:
from pyspark.streaming.kafka import OffsetRange

In [29]:
wiki_rdd_source = KafkaUtils.createRDD(sc, {'bootstrap.servers': 'iccluster042.iccluster.epfl.ch:6667'}, [OffsetRange('wiki-edits',0,10000,10100)])

The code above creates a RDD from the `wiki-edits` Kafka topic, that goes from offset `10000` to offset `10100`.

In [30]:
wiki_rdd_source.first()

(None,
 '{"bot":false,"comment":"","log_action":"patrol","log_action_comment":"marked revision 19348481 of [[Blue Öyster Cult]] patrolled ","log_id":15167288,"log_params":{"auto":0,"curid":"19348481","previd":"19281811"},"log_type":"patrol","meta":{"domain":"no.wikipedia.org","dt":"2019-05-01T08:43:07+00:00","id":"242f2dcd-6bed-11e9-8a74-1866da9926fb","request_id":"6c57ed8a-1a48-47ee-afec-e116516f0d45","schema_uri":"mediawiki/recentchange/2","topic":"eqiad.mediawiki.recentchange","uri":"https://no.wikipedia.org/wiki/Blue_%C3%96yster_Cult","partition":0,"offset":1561941160},"namespace":0,"parsedcomment":"","server_name":"no.wikipedia.org","server_script_path":"/w","server_url":"https://no.wikipedia.org","timestamp":1556700187,"title":"Blue Öyster Cult","type":"log","user":"TommyG","wiki":"nowiki"}\n')

In [31]:
wiki_rdd_source.count()

100

We can see that we got the same type of data  as in the stream above.

Now, we can write the function to parse the json messages.

Note: sometimes the messages are invalid json, so we will use `flatMap` to discard those.

In [32]:
import json

def parse_wiki(doc):
    try:
        return [json.loads(doc)]
    except json.decoder.JSONDecodeError:
        return []

In [33]:
wiki_rdd = wiki_rdd_source.flatMap(lambda x: parse_wiki(x[1]))

In [34]:
wiki_rdd.first()

{'bot': False,
 'comment': '',
 'log_action': 'patrol',
 'log_action_comment': 'marked revision 19348481 of [[Blue Öyster Cult]] patrolled ',
 'log_id': 15167288,
 'log_params': {'auto': 0, 'curid': '19348481', 'previd': '19281811'},
 'log_type': 'patrol',
 'meta': {'domain': 'no.wikipedia.org',
  'dt': '2019-05-01T08:43:07+00:00',
  'id': '242f2dcd-6bed-11e9-8a74-1866da9926fb',
  'offset': 1561941160,
  'partition': 0,
  'request_id': '6c57ed8a-1a48-47ee-afec-e116516f0d45',
  'schema_uri': 'mediawiki/recentchange/2',
  'topic': 'eqiad.mediawiki.recentchange',
  'uri': 'https://no.wikipedia.org/wiki/Blue_%C3%96yster_Cult'},
 'namespace': 0,
 'parsedcomment': '',
 'server_name': 'no.wikipedia.org',
 'server_script_path': '/w',
 'server_url': 'https://no.wikipedia.org',
 'timestamp': 1556700187,
 'title': 'Blue Öyster Cult',
 'type': 'log',
 'user': 'TommyG',
 'wiki': 'nowiki'}

In [35]:
wiki_rdd.count()

100

We can also use this on the stream:

In [36]:
ssc = StreamingContext(sc, 10)
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group-' + username, {'wiki-edits': 1})

In [37]:
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1]))

In [38]:
wiki_dstream.pprint()

In [39]:
ssc.start()

-------------------------------------------
Time: 2019-05-07 18:58:30
-------------------------------------------
{'bot': True, 'comment': '[[:Module talk:Adjacent stations/NJ Transit/doc]] added to category', 'id': 1151981591, 'meta': {'domain': 'en.wikipedia.org', 'dt': '2019-05-07T16:57:13+00:00', 'id': '29a6d79e-70e9-11e9-b37a-1866da97f886', 'request_id': '627e0689a361a13a467d8e79', 'schema_uri': 'mediawiki/recentchange/2', 'topic': 'eqiad.mediawiki.recentchange', 'uri': 'https://en.wikipedia.org/wiki/Category:Unprintworthy_redirects', 'partition': 0, 'offset': 1577366095}, 'namespace': 14, 'parsedcomment': '<a href="/wiki/Module_talk:Adjacent_stations/NJ_Transit/doc" class="mw-redirect" title="Module talk:Adjacent stations/NJ Transit/doc">Module talk:Adjacent stations/NJ Transit/doc</a> added to category', 'server_name': 'en.wikipedia.org', 'server_script_path': '/w', 'server_url': 'https://en.wikipedia.org', 'timestamp': 1557248233, 'title': 'Category:Unprintworthy redirects', 't

In [40]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 18:58:40
-------------------------------------------
{'bot': False, 'comment': '', 'log_action': 'hit', 'log_action_comment': '83.30.23.47 uruchomił(a) [[Specjalna:Filtr nadużyć/3|filtr 3]], wykonując „edit” na [[MEGAWONSZ]]. Podjęte działania: Ostrzeżenie ([[Specjalna:Rejestr nadużyć/440439|szczegóły]])', 'log_id': 0, 'log_params': {'action': 'edit', 'actions': 'warn', 'filter': 3, 'log': 440439}, 'log_type': 'abusefilter', 'meta': {'domain': 'pl.wikipedia.org', 'dt': '2019-05-07T16:58:29+00:00', 'id': '565af7b9-70e9-11e9-80c2-141877613814', 'request_id': 'XNG5NApAADwAAG5tcREAAABQ', 'schema_uri': 'mediawiki/recentchange/2', 'topic': 'eqiad.mediawiki.recentchange', 'uri': 'https://pl.wikipedia.org/wiki/MEGAWONSZ', 'partition': 0, 'offset': 1577367746}, 'namespace': 0, 'parsedcomment': '', 'server_name': 'pl.wikipedia.org', 'server_script_path': '/w', 'server_url': 'https://pl.wikipedia.org', 'timestamp': 1557248309, 'title': 

-------------------------------------------
Time: 2019-05-07 18:58:50
-------------------------------------------

-------------------------------------------
Time: 2019-05-07 18:59:00
-------------------------------------------



**2.2.a** Create a stream from `wiki-edits` which contains only the `type` and `wiki` fields.

In [41]:
ssc = StreamingContext(sc, 10)
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'ebouille-test-wiki', {'wiki-edits': 1})
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1]))

wiki_dstream2 = wiki_dstream.map(lambda x: {'type': x['type'], 'wiki': x['wiki']})
wiki_dstream2.pprint()

ssc.start()

In [42]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 18:59:30
-------------------------------------------
{'type': 'edit', 'wiki': 'enwiktionary'}
{'type': 'edit', 'wiki': 'frwiki'}
{'type': 'new', 'wiki': 'itwiki'}
{'type': 'log', 'wiki': 'commonswiki'}
{'type': 'new', 'wiki': 'enwiki'}
{'type': 'edit', 'wiki': 'etwiki'}
{'type': 'edit', 'wiki': 'wikidatawiki'}
{'type': 'categorize', 'wiki': 'commonswiki'}
{'type': 'categorize', 'wiki': 'commonswiki'}
{'type': 'edit', 'wiki': 'enwiki'}
...

-------------------------------------------
Time: 2019-05-07 18:59:40
-------------------------------------------

-------------------------------------------
Time: 2019-05-07 18:59:50
-------------------------------------------



## 2.3 Window operations

We have seen how to collect data into a stream on spark RDDs and how to apply simple transformations on them. Spark allows to perform window operations on streams.

Documentation: https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#window-operations

Let's have a look at what we can get from the `wiki-edits`. We will perform operations on windows of size 10 minutes, every 2 seconds.

As a simple example, here is how to count the number of messages for such a window.

In [43]:
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/wikicheck/'.format(username))
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group-' + username, {'wiki-edits': 1})
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1]))

In [44]:
# 600s = 10minutes, every 2 seconds
wiki_counts = wiki_dstream.countByWindow(600, 2)

In [45]:
wiki_counts.pprint()

In [46]:
ssc.start()

-------------------------------------------
Time: 2019-05-07 19:00:18
-------------------------------------------
2206

-------------------------------------------
Time: 2019-05-07 19:00:20
-------------------------------------------
2259

-------------------------------------------
Time: 2019-05-07 19:00:22
-------------------------------------------
2281

-------------------------------------------
Time: 2019-05-07 19:00:24
-------------------------------------------
2341

-------------------------------------------
Time: 2019-05-07 19:00:26
-------------------------------------------
2378

-------------------------------------------
Time: 2019-05-07 19:00:28
-------------------------------------------
2419

-------------------------------------------
Time: 2019-05-07 19:00:30
-------------------------------------------
2468

-------------------------------------------
Time: 2019-05-07 19:00:32
-------------------------------------------
2526

----------------------------------------

In [47]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 19:01:26
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:28
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:30
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:32
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:34
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:36
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:38
-------------------------------------------
3704

-------------------------------------------
Time: 2019-05-07 19:01:40
-------------------------------------------
3704

----------------------------------------

Look at the documentation to see how to use the window operations.

**2.3.a** Create a stream that counts the actions done by a bot and those done by humans within the time window.

**2.3.b** Create a stream that gets the number of actions by `type`. Also get the number of actions by `wiki`.

**2.3.c** Send the above streams to kafka topics (example in **2.1**) and use that to dynamically plot:
- the volume of actions by bots vs actions by humans.
- the volume of actions by `type`, by `wiki`,

Use `wiki-plot-bot-<username>`,  `wiki-plot-type-<username>` and `wiki-plot-wiki-<username>` for the respective topic names.

In [48]:
#Answer 2.3.a: create a stream and count actions by bot vs actions by humans
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/wikicheck/'.format(username))
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group-' + username, {'wiki-edits': 1})
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1]))

wiki_count_bot = wiki_dstream.map(lambda x: 'bot' if x['bot'] else 'human').countByValueAndWindow(600, 2)

wiki_count_bot.pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-07 19:02:06
-------------------------------------------
('human', 868)
('bot', 289)

-------------------------------------------
Time: 2019-05-07 19:02:08
-------------------------------------------
('human', 896)
('bot', 302)

-------------------------------------------
Time: 2019-05-07 19:02:10
-------------------------------------------
('human', 918)
('bot', 312)

-------------------------------------------
Time: 2019-05-07 19:02:12
-------------------------------------------
('human', 971)
('bot', 338)

-------------------------------------------
Time: 2019-05-07 19:02:14
-------------------------------------------
('human', 1015)
('bot', 355)

-------------------------------------------
Time: 2019-05-07 19:02:16
-------------------------------------------
('human', 1044)
('bot', 365)

-------------------------------------------
Time: 2019-05-07 19:02:18
-------------------------------------------
('human', 1078)
('bot', 38

In [49]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 19:02:20
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:22
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:24
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:26
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:28
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:30
-------------------------------------------
('human', 1096)
('bot', 396)

-------------------------------------------
Time: 2019-05-07 19:02:32
-------------------------------------------
('human', 1096)
('bot'

In [50]:
#Answer 2.3.b: Create stream that gets the number of actions by type. Also get the number of actions by wiki.
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/wikicheck/'.format(username))
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group', {'wiki-edits': 1})
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1]))

wiki_count_type = wiki_dstream.map(lambda x: x['type']).countByValueAndWindow(600, 2)

wiki_count_wiki = wiki_dstream.map(lambda x: x['wiki']).countByValueAndWindow(600, 2)

wiki_count_type.pprint()
wiki_count_wiki.pprint()
ssc.start()

-------------------------------------------
Time: 2019-05-07 19:03:04
-------------------------------------------
('edit', 63470)
('categorize', 23107)
('new', 2165)
('log', 6416)
('142', 47)

-------------------------------------------
Time: 2019-05-07 19:03:04
-------------------------------------------
('wikidatawiki', 32104)
('jawiki', 762)
('commonswiki', 14516)
('enwiki', 13232)
('enwiktionary', 4300)
('svwikisource', 60)
('arwiki', 1086)
('cawiki', 655)
('idwiki', 182)
('nowiki', 179)
...

-------------------------------------------
Time: 2019-05-07 19:03:06
-------------------------------------------
('edit', 109606)
('categorize', 40718)
('new', 4153)
('log', 10791)
('142', 67)



In [51]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

-------------------------------------------
Time: 2019-05-07 19:03:06
-------------------------------------------
('wikidatawiki', 53907)
('jawiki', 1293)
('commonswiki', 25471)
('enwiki', 24546)
('enwiktionary', 4835)
('svwikisource', 95)
('arwiki', 2419)
('cawiki', 1237)
('idwiki', 301)
('nowiki', 346)
...

-------------------------------------------
Time: 2019-05-07 19:03:08
-------------------------------------------
('edit', 109634)
('categorize', 40734)
('new', 4154)
('log', 10795)
('142', 67)

-------------------------------------------
Time: 2019-05-07 19:03:08
-------------------------------------------
('wikidatawiki', 53921)
('jawiki', 1293)
('commonswiki', 25491)
('enwiki', 24550)
('enwiktionary', 4835)
('svwikisource', 95)
('arwiki', 2419)
('cawiki', 1237)
('idwiki', 301)
('nowiki', 347)
...

-------------------------------------------
Time: 2019-05-07 19:03:10
-------------------------------------------
('edit', 109665)
('categorize', 40739)
('new', 4159)
('log', 10795)
(

In [52]:
#Answer 2.3.c: write the streams to topics
ssc = StreamingContext(sc, 2)
ssc.checkpoint('hdfs:///homes/{}/wikicheck/'.format(username))
dstream = KafkaUtils.createStream(ssc, ZOOKEEPER_QUORUM, 'test-wiki-group-' + username, {'wiki-edits': 1})
wiki_dstream = dstream.flatMap(lambda x: parse_wiki(x[1])).cache()

wiki_count_type = wiki_dstream.map(lambda x: x['type']).countByValueAndWindow(600, 30)
wiki_count_wiki = wiki_dstream.map(lambda x: x['wiki']).countByValueAndWindow(600, 30)
wiki_count_bot = wiki_dstream.map(lambda x: 'bot' if x['bot'] else 'human').countByValueAndWindow(600, 30)


from pykafka import KafkaClient
from functools import partial

def sendToKafka(iter, topic):
    client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)
    with client.topics[('wiki-plot-{}-{}'.format(topic, username)).encode()].get_sync_producer() as producer:
        for record in iter:
            producer.produce("{},{}".format(*record).encode())
            

wiki_count_type.foreachRDD(lambda rdd: rdd.foreachPartition(partial(sendToKafka, topic="type")))
wiki_count_wiki.foreachRDD(lambda rdd: rdd.foreachPartition(partial(sendToKafka, topic="wiki")))
wiki_count_bot.foreachRDD(lambda rdd: rdd.foreachPartition(partial(sendToKafka, topic="bot")))

ssc.start()

Here is an example of a possible plot, but you can use pie charts or other representations as well.

In [None]:
#Answer: create the plot in bokeh

from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Range1d
import time
from bokeh.transform import factor_cmap
from bokeh.palettes import Spectral6

output_notebook()

x=['bot', 'human' ]
y=[0, 0]
source = ColumnDataSource(data=dict(x=x, y=y))

p = figure(x_range=x, plot_height=350, toolbar_location=None, title="Bot Counts")
p.vbar(x='x', top='y', width=0.9, source=source, legend="x",
       line_color='white', fill_color=factor_cmap('x', palette=Spectral6, factors=x))

handle=show(p, notebook_handle=True)

consumer = client.topics[('wiki-plot-bot-' + username).encode()].get_simple_consumer()

try:
    for message in consumer:
        if message is not None:
            
            n, c = message.value.decode().split(",")
            if n == 'bot':
                y[0] = int(c)
            else:
                y[1] = int(c) 
                
            source.data = dict(x=x, y=y)
            push_notebook(handle=handle)
            time.sleep(1)
except KeyboardInterrupt:
    print("Consumer interrupted.")

In [None]:
#Answer: stop the stream context, and release the spark context to free resources. Remember to stop the kernel
ssc.stop(stopSparkContext=True, stopGraceFully=True)

In [None]:
sc.stop()