Streaming data#

  • Data that can be loaded chunk-wise (single or a few at the time).

  • Data flowing through a socket/pipe/streaming API/…

  • Typically requires:

    • Checking if stream is active.

    • A reactive reading mechanism or time delayed loop to check for updates.

    • Error/exception checking/handling.

    • A receiving enitity that can use the incomming data, e.g., printing to screen, updating graphics, updating a model, predicting some outcome/state/quantity, …

Live reading of Twitch chat#

# Check if user is logged in ("==" active chat)
import requests
user = "k3soju" # Change this to the user you want to check, e.g., epicdan22, zackrawrr, summit1g, 
# mizkif, cohhcarnage, k3soju, etc.
response = requests.get("https://decapi.me/twitch/uptime/"+user).text
is_online = response != user+" is offline"
print(is_online)
False
# Connect to the Twitch chat using River
from river import stream

if is_online:
    oauth = open('../../../No_sync/twitch_oauth','r').read()
    twitch_chat = stream.TwitchChatStream(
        nickname="khliland", # Exchange with your Twitch username
        token=oauth,
        channels=[user]
    )
# If the user is online, print the first 2 messages
if is_online:
    messages = 10
    for item in twitch_chat:
        print(item)
        if messages > 1:
            messages -= 1
        else:
            print("Puh, that's enough!")
            break

Comments#

  • Here, River has formated everything nicely for us as dictionaries.

    • River’s streaming format is based on dictionaries to minimize overhead.

  • See escpecially the datetime formatting.

  • We will return to River and streaming Machine Learning later in the book.

Exercise#

Make a clock using World Time API