Skip to content

Subscribing to Data

This guide shows you how to use ReductStore Python SDK to subscribe to new records in a bucket.

Prerequisites

If you don't have a ReductStore instance running, you can easily spin one up as a Docker container. To do this, run the following command:

docker run -p 8383:8383 reduct/store:latest

Also, if you haven't already installed the SDK, you can use the pip package manager to install the reduct-py package:

pip install reduct-py

Example

For demonstration purposes, we will create a script with two coroutines:

  1. writer will write data to an entry in a bucket with the current timestamp and the label good.
  2. subscriber will subscribe to the entry and records which have the label good equal to True.

When the subscriber will receive 10 records, it stops the subscription and the writer.

This is the whole script:

subscribing.py
import asyncio
from time import time_ns

from reduct import Client, Bucket

client = Client("http://127.0.0.1:8383")
running = True


async def writer():
    """Write a blob with toggling good flag"""
    bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
    good = True
    for _ in range(21):
        data = b"Some blob of data"
        ts = int(time_ns() / 10000)
        await bucket.write("entry-1", data, ts, labels=dict(good=good))
        print(f"Writer: Record written: ts={ts}, good={good}")
        good = not good
        await asyncio.sleep(1)


# --8<-- [start:subscriber]
async def subscriber():
    """Subscribe on good records and exit after ten received"""
    global running
    bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
    counter = 0
    await asyncio.sleep(1)
    async for record in bucket.subscribe(
        "entry-1",
        start=int(time_ns() / 10000),
        poll_interval=0.2,
        include=dict(good=True),
    ):
        print(
            f"Subscriber: Good record received: ts={record.timestamp}, labels={record.labels}"
        )
        counter += 1
        if counter == 10:
            break


# --8<-- [end:subscriber]


async def main():
    await asyncio.gather(writer(), subscriber())


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

The most important part of the script is the subscriber coroutine:

subscribing.py
async def subscriber():
    """Subscribe on good records and exit after ten received"""
    global running
    bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
    counter = 0
    await asyncio.sleep(1)
    async for record in bucket.subscribe(
        "entry-1",
        start=int(time_ns() / 10000),
        poll_interval=0.2,
        include=dict(good=True),
    ):
        print(
            f"Subscriber: Good record received: ts={record.timestamp}, labels={record.labels}"
        )
        counter += 1
        if counter == 10:
            break

The subcribe method queries records from the entry-1 entry from the current time and subscribes to new records that have the label good equal true. Since ReductStore provides an HTTP API, the subscribe method polls the entry for each poll_interval seconds.

When to use subscribing

The subscribing is useful when you want your application to be notified when new records are added to an entry. Some possible use cases are:

  • You can use ReductStore as a simple message broker with persistent storage.
  • Replication of data from one ReductStore instance to another one. For example, your can subscribe to records with certain labels and write them to another ReductStore instance for long-term storage.
  • Ingesting data from a ReductStore instance to a data warehouse.