Streaming models#

Static models#

  • A pretrained model can be applied to a stream of data.

  • We will train a model using scikit-learn and predict in a loop.

  • Then try to convert it to a river streaming model before applying it to a stream of data.

  • Finally, create a streaming model directly in river.

from sklearn import datasets
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn import preprocessing

# Load the data
dataset = datasets.load_breast_cancer()
X, y = dataset.data, dataset.target

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42)

# Define the steps of the model
model = Pipeline([
    ('scale', preprocessing.StandardScaler()),
    ('log_reg', SGDClassifier(loss="log_loss", penalty="l2")) 
    # (Logistic Regression using Stochastic Gradient Descent to enable .partial_fit())
])

# Ten-fold cross-validation
cv = KFold(n_splits=10, shuffle=True, random_state=42)

# Compute the accuracy
scores = cross_val_score(model, X_train, y_train, scoring='accuracy', cv=cv)

# Display the average score and it's standard deviation
print(f'Accuracy: {scores.mean():.3f}{scores.std():.3f})')
Accuracy: 0.971 (± 0.022)
# Fit the model with all training data
model.fit(X_train, y_train)
Pipeline(steps=[('scale', StandardScaler()),
                ('log_reg', SGDClassifier(loss='log_loss'))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
# Loop over the test data (i.e., stream) and compute the accuracy
acc = 0
for x, y in zip(X_test, y_test):
    acc += model.score(x.reshape(1, -1), y.reshape(1, -1))
print(f'Accuracy: {acc / len(X_test) * 100:.2f}%')
Accuracy: 96.81%

Converting to river#

# Extract the mean and standard deviation of each feature
mean = model.named_steps['scale'].mean_
std = model.named_steps['scale'].scale_

# Create a river StandardScaler and insert the mean and variance
from river.preprocessing import StandardScaler
ss = StandardScaler()
ss.vars = dict(zip(dataset.feature_names, std ** 2))
ss.means = dict(zip(dataset.feature_names, mean))
# Wrap the Logistic Regression model
import numpy as np
from river.compat import convert_sklearn_to_river
streaming_model = convert_sklearn_to_river(model.named_steps['log_reg'], classes=np.unique(y_train))
# Predict the test set one sample at a time
from river import metrics
from river import stream
metric = metrics.Accuracy()
for x, y in stream.iter_array(X_test, y_test, feature_names=dataset.feature_names):
    x_scaled = ss.transform_one(x)
    y_pred = streaming_model.predict_one(x_scaled)
    metric.update(y_pred, y)
    # metric = metric.update(y_pred, y) # For river < 0.21
print(metric)
Accuracy: 96.81%

Direct usage of river#

  • The defaults in scikit-learn’s SGDClassifier and river’s LogisticRegression are different.

  • Pipelines can be made with a parenthesis and a pipe symbol.

# Create a pipeline using the StandardScaler and the Logistic Regression model from river
from river import linear_model

streaming_model = (StandardScaler() | linear_model.LogisticRegression())
# Train the streaming_model with the training data
#dataset_train = dataset
#dataset_train.data = X_train.copy()
#dataset_train.target = y_train.copy()

# Online learning
for x, y in stream.iter_array(X_train, y_train, feature_names=dataset.feature_names):
    streaming_model.learn_one(x,y)
# Predict the test set one sample at a time
metric = metrics.Accuracy()
static_pred = []
for x, y in stream.iter_array(X_test, y_test, feature_names=dataset.feature_names):
    y_pred = streaming_model.predict_one(x)
    static_pred.append(y_pred)
    metric.update(y_pred, y)
print(metric)
Accuracy: 97.34%

Note

The training regimes are different, so we cannot expect river and scikit-learn to give exactly the same models.

Dynamic models#

  • For comparison, we can continue learning during prediction of the test set (given that labels come together with the streaming data).

# Learn and predict the test set one sample at a time
metric = metrics.Accuracy()
dynamic_pred = []
for x, y in stream.iter_array(X_test, y_test, feature_names=dataset.feature_names):
    y_pred = streaming_model.predict_one(x)
    streaming_model.learn_one(x, y)
    dynamic_pred.append(y_pred)
    metric.update(y_pred, y)
print(metric)
Accuracy: 96.81%

Inspect predictions one by one

# Plot y_test together with static and dynamic predictions in two plots above each other
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 3))
plt.subplot(2, 1, 1)
plt.plot(y_test, label='y_test')
plt.plot(static_pred, label='static_pred')
plt.legend(loc='upper left')
plt.subplot(2, 1, 2)
plt.plot(y_test, label='y_test')
plt.plot(dynamic_pred, label='dynamic_pred')
plt.legend(loc='upper left')
plt.show()
../../_images/0004322403070b943e6d293ef732bb812c2944852788b9f923fc7db40b2f5947.png

Comment: In this case, one classification changed to the worse with the dynamic model, the rest stayed the same.

River vs scikit-learn#

  • First of all, these are only examples of machine learning frameworks.

    • There are several other established and polished packages to use in their places, e.g., Quix, (scikit-multiflow + creme = River), PyTorch, TensorFlow, theano, PyCaret, OpenCV, etc.

  • River is built from the ground up with streaming data in mind.

    • Pre-processors, regressors and classifiers are all incremental.

    • A host of convenience functions for online/batch-wise learning are available.

  • scikit-learn is built for tabular data.

    • .partial_fit() is available for some pre-processors, regressors and classifiers.

    • Stream handling can be manually coded or helped by River and friends.

Live reading of Twitch chat, revisited#

# Check if user is logged in ("==" active chat)
import requests
user = "summit1g" # Change this to the user you want to check, e.g., epicdan22, zackrawrr, summit1g, mizkif, cohhcarnage, etc.
response = requests.get("https://decapi.me/twitch/uptime/"+user).text
is_online = response != user+" is offline"
print(is_online)
True
# Connect to the Twitch chat using River
from river import stream

oauth = open('../../../No_sync/twitch_oauth','r').read()
twitch_chat = stream.TwitchChatStream(
    nickname="khliland", # Exchange with your Twitch username
    token=oauth,
    channels=[user]
)
# If the user is online, print the first messages
if is_online:
    messages = 2
    for item in twitch_chat:
        print(item)
        if messages > 1:
            messages -= 1
        else:
            print("Puh, that's enough!")
            break
{'dt': datetime.datetime(2024, 12, 4, 21, 24, 35, 598231), 'channel': 'summit1g', 'username': 'fossabot', 'msg': "summit's camera: Elgato Facecam Pro https://amzn.to/4eIaG0C"}
{'dt': datetime.datetime(2024, 12, 4, 21, 24, 45, 838806), 'channel': 'summit1g', 'username': 'teeveeforme', 'msg': 'is this as good as dungeonborne?'}
Puh, that's enough!
# Create a river stream that counts the number of characters in the 'msg' part of the 'item' dictionary and plots it
import matplotlib.pyplot as plt
from river.stats import Mean
from IPython import display
import time
messages = 20
i = 0
message_length = [np.nan] * messages
mean_length = [np.nan] * messages
mean = Mean()
if is_online:
    figure, ax = plt.subplots(figsize=(7,2))
    plt.ion()
    for item in twitch_chat:
        if i < messages:
            message_length[i] = len(item['msg'])
            print(message_length[i])
            mean.update(message_length[i]) # river stats
            mean_length[i] = mean.get()
            plt.clf()
            plt.plot(list(range(messages)), message_length, label='Message length')
            plt.plot(list(range(messages)), mean_length, label='Mean message length')
            plt.xlim(0, messages-1)
            plt.legend(loc='upper right')
            display.display(plt.gcf())
            display.clear_output(wait=True)
            time.sleep(0.0001)
            i += 1
        else:
            print("Puh, that's enough!")
            break
plt.show()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[15], line 14
     12 figure, ax = plt.subplots(figsize=(7,2))
     13 plt.ion()
---> 14 for item in twitch_chat:
     15     if i < messages:
     16         message_length[i] = len(item['msg'])

File ~/miniforge3/envs/IND320_2024/lib/python3.12/site-packages/river/stream/twitch_chat_stream.py:161, in TwitchChatStream.__iter__(self)
    159 with socket.socket() as sock:
    160     self._setup_connection(sock)
--> 161     yield from map(dataclasses.asdict, self._gen_items(sock))

File ~/miniforge3/envs/IND320_2024/lib/python3.12/site-packages/river/stream/twitch_chat_stream.py:143, in TwitchChatStream._gen_items(self, sock)
    141 while True:
    142     try:
--> 143         data = sock.recv(self.buffer_size)
    144         if not data:
    145             continue

KeyboardInterrupt: 
../../_images/061cf52507f63dd9a5b0e847a08824194f580914dc290fd63f42c0361427a1b5.png

Synthetic streams#

  • river can generate synthetic streams of various types.

# Generate Agrawal stream with classification type 0
from river.datasets.synth import Agrawal
dataset = Agrawal(classification_function=0, seed=42)
dataset
for x, y in dataset.take(5):
    print(list(x.values()), y)

Exercise#

*HoeffdingTreeRegressor: This was originally called a Hoeffding Anytime Tree (HATT). It is an algorithm that is extremely efficient at updating decision trees with streaming data.

Streaming forecasts#

  • river includes the SNARIMAX model, where N stands for non-linear, i.e., the (S)easonal (N)on-linear (A)uto®egressive (I)ntegrated (M)oving-(A)verage with e(X)ogenous inputs model.

  • The basic parameters match SARIMAX from the statsmodels package, but are named p/d/q/sp/sd/sq/m.

  • If no regressor is specified, a pipeline containing a StandardScaler and LinearRegression is used.

  • No statistics or summary tables are produced, so summaries must be manually created.

Airline passenger data#

  • Monthly international passenger data from January 1949 through December 1960.

from river import datasets
for t, (x, y) in enumerate(datasets.AirlinePassengers()):
    print(x, y)
    if t > 5:
        break
import datetime as dt
from river import time_series
from river import metrics

#period = 12
model = time_series.SNARIMAX( p=3, d=1, q=3 )

y_test = []
for t, (x, y) in enumerate(datasets.AirlinePassengers()):
    if t > 143-12: # Stop learning before the last 12 months
        y_test.append(y)
    else:
        model.learn_one(y)
        # model = model.learn_one(y)# For river < 0.21

horizon = 12 # Predict 12 months into the future
future = [
    {'month': dt.date(year=1960, month=m, day=1)}
    for m in range(1, horizon + 1)
]
forecast = model.forecast(horizon=horizon)
metric = metrics.R2()
for x, y_pred, y_truth in zip(future, forecast, y_test):
    print(x['month'], f'{y_pred:.3f}', f'{y_truth:.3f}')
    metric.update(y_truth, y_pred)
    # metric = metric.update(y_truth, y_pred).get() # For river < 0.21
print(metric)

SARIMA + feature engineering#

  • In addition to the original time series, we may add some freshly calculated exogenous variables.

  • In river’s SNARIMAX example, a distance function resembling a Radial Basis Function is applied to the months

    • This results in 12 new features measuring the distance to other months in the year.

    • In addition they include ordinal dates, i.e., day number since 0001-01-01.

import calendar
import math
from river import compose
from river import linear_model
from river import optim
from river import preprocessing

def get_month_distances(x):
    return {
        calendar.month_name[month]: math.exp(-(x['month'].month - month) ** 2)
        for month in range(1, 13)
    }

def get_ordinal_date(x):
    return {'ordinal_date': x['month'].toordinal()}

extract_features = compose.TransformerUnion(
    get_ordinal_date,
    get_month_distances
)
extract_features.transform_one({'month': dt.date(year=1960, month=1, day=1)})
extract_features.transform_one({'month': dt.date(year=1960, month=4, day=1)})
model = (
    extract_features |
    time_series.SNARIMAX(
        p=1,
        d=0,
        q=0,
        m=12, # Seasonal model with period 12
        sp=3,
        sq=6,
        regressor=(
            preprocessing.StandardScaler() |
            linear_model.LinearRegression(
                intercept_init=110, # Help getting a better start
                optimizer=optim.SGD(0.01),
                intercept_lr=0.3
            )
        )
    )
)
# Modelling and predicting the AirlinePassengers dataset
y_test = []
for t, (x, y) in enumerate(datasets.AirlinePassengers()):
    if t > 143-12: # Stop learning before the last 12 months
        y_test.append(y)
    else:
        model.learn_one(x,y) # Note!: Dates are used as features
        # model = model.learn_one(y)# For river < 0.21

horizon = 12 # Predict 12 months into the future
future = [
    {'month': dt.date(year=1960, month=m, day=1)}
    for m in range(1, horizon + 1)
]
forecast = model.forecast(horizon=horizon)
metric = metrics.R2()
for x, y_pred, y_truth in zip(future, forecast, y_test):
    print(x['month'], f'{y_pred:.3f}', f'{y_truth:.3f}')
    metric.update(y_truth, y_pred)
    # metric = metric.update(y_truth, y_pred).get() # For river < 0.21

print(metric)