AI News Hub Logo

AI News Hub

Yet another end-to-end streaming dashboarding example

DEV Community
Agile Developer

Introduction In this post, we present an introductory example using Apache Pinot to ingest an Apache Kafka stream. This is an introductory post that builds upon existing Apache Pinot material from the official trainings and documentation. The purpose here is not just to rehash what is in the official docs, but a preparation for a second part. The idea, is to adapt the official examples to this end. Moreover, when I tried to run these examples, I had some extra ideas in how to better present the material. Part of the presented setup is also based on yet another Apache Pinot example in a complementary series of lectures that is written for Javascript. Our focus here is Python. Here are the two references I used Lecture 4 (https://github.com/startreedata/learn/tree/main/pinot-advanced/04-stream-ingestion). It is a series of advanced Pinot usage from Startree. I Ported the JS example to Python. Updated continuously Streamlit example Another purpose of this introduction is to document my learning process so as to use it later as a reference or personal notes. Consequently the coherence of the material presented is of paramount importance. For a formal introduction to Apache Pinot, the excellent playlists below are highly recommended. Apache Pinot 101 Apache Pinot 201 Let's start our journey. Our setup is completely local. We will use exclusively Podman. All the executions are done on Windows 11 using Command Prompt terminals under VScodium. You might need to apply some minor changes for your environment (if any). The docker compose file is mostly covered here . We just added a .env file for convenience. podman compose up -d This starts an Apache Kafka single-node cluster and an Apache Pinot cluster with one Controller, one Broker and one Server nodes. More on this later. You can visit the Apache Pinot Controller UI here. https://stream.wikimedia.org/v2/stream/recentchange and people can visit it with their browser and see these events. Obviously, the typical web surfer is not interested in this overwhelming, ever growing list of repetitive JSON context. It is so large that one has to resort to Data Analytics methods, so as to make sense. Moreover, this event stream is not structured in a way to convey meaning as a typical web page. On the contrary, methods of Data Engineering are necessary to capture it in a streaming table (Apache Spark terminology is used here), do whatever data transformations are necessary and then make it available to a Data Analytics system for visualizing the different aspects. this. It also lists various code snippets on how to consume it. In terms of Data Engineering, is a web service that exposes continuous streams of structured event data. It does so over HTTP. For a Data Engineer, a source transport format is half the story. The rest is the schema. It is available here. In terms of software development, this means, that we need a client library. There are many, but SSE client stands out. It is also used in the Streamlit tutorial of Apache Pinot. For simplicity, we will use the Wikipedia approach. Here is the adapted code from Wikipedia. url = 'https://stream.wikimedia.org/v2/stream/recentchange' headers = {"User-Agent": "advanced_pinot_tutorial"} with EventSource(url, headers=headers) as stream: for event in stream: if event.type == 'message': try: change = json.loads(event.data) change['ts'] = change['timestamp'] * 1000 del change['timestamp'] # Kafka Place holder Code is here except ValueError: pass From the schema what stands out for a streaming source is the timestamp timestamp: The above conversion is to avoid a conflict with any internal timestamp function. Also we convert the Unix timestamp to milliseconds. Keep it in mind. Now we need some code to push to an Apache Kafka topic. We use the confluent-kafka library. First we setup our Apache Kafka connection (we implicitly assume the default 9092 port for the Apache Kafka), which is petty much self-explanatory kafka_topic_name = "wikipedia-events" # conf = {'bootstrap.servers': 'redpanda-0,redpanda-1,redpanda-2'} conf = {'bootstrap.servers': 'kafka'} kafka_admin = admin.AdminClient(conf) kafka_admin.delete_topics([kafka_topic_name]) kafka_admin.create_topics([admin.NewTopic(kafka_topic_name, 1, 1)]) producer = Producer(conf) and then in the Apache Kafka placeholder in the previous snippet we put the push logic producer.poll(0) producer.produce(kafka_topic_name, key=change["meta"]["id"], value=json.dumps(change), callback=acked) events_processed += 1 if events_processed == 100: print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events") producer.flush() events_processed = 0 every 100 events, we log the push of the batch. Confluent has very good documentation on how this library is used. We pack the application a Docker image podman build -t pinot-advanced/python-streaming-ingest ./producer-app and then, we run it podman run -it --network=pinot-advanced pinot-advanced/python-streaming-ingest:latest Now it is time to verify the Apache Kafka push is working appropriately. For convenience a consumer Python app is provided. You can start it with similar commands podman build -t pinot-advanced/python-kafka-consumer ./consumer-app podman run -it --network=pinot-advanced pinot-advanced/python-kafka-consumer:latest Everything seems to work fine. In order to create the streaming table, we need to tell Apache Pinot both the transport format and the schema. The schema need not be exhaustive, but include a subset of what we need. For this reason we need two files. Each column in Apache Pinot has one of the following types. Dimension Metric Date/Time It is pretty obvious what the last one is used for. The first one is for filtering (used for drilling down). The second one is for aggregations. This distinction does not exist in relational databases or other Big Data solutions, and is what makes Apache Pinot a true Big Data streaming solution. We will not need any metric fields, since we get a stream of data edits. We will do what people call distinctCounts which in reality is an aggregation, but the fields we will use are not numeric and so, they cannot go to the metric fields section. Here you are { "schemaName": "wikievents", "dimensionFieldSpecs": [ { "name": "metaJson", "dataType": "STRING" }, { "name": "user", "dataType": "STRING" }, { "name": "domain", "dataType": "STRING" }, { "name": "topic", "dataType": "STRING" } ], "dateTimeFieldSpecs": [ { "name": "ts", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] } Next one is the table configuration and transport format. See https://github.com/fithisux/visualize-streamlit-pinot-example/blob/main/scripts/wikipedia_events_realtime_table_config.json for the details. I will just focus on this snippet { "transformConfigs": [ { "columnName": "domain", "transformFunction": "JSONPATH(metaJson, '$.domain')" }, { "columnName": "topic", "transformFunction": "JSONPATH(metaJson, '$.topic')" } ] }, It is necessary, so as to grab the fields from the JSON payload of the Apache Kafka message. So, fields topic and domain are computed fields, and for this reason we need explicitly expose the metaJson column. With the compose file and streamer app up and running we will construct our table in Apache Pinot. podman run -it --network=pinot-advanced -v ./scripts/wikipedia_events_schema.json:/scripts/wikipedia_events_schema.json -v ./scripts/wikipedia_events_realtime_table_config.json:/scripts/wikipedia_events_realtime_table_config.json apachepinot/pinot:latest-25-ms-openjdk AddTable -schemaFile /scripts/wikipedia_events_schema.json -tableConfigFile /scripts/wikipedia_events_realtime_table_config.json -controllerHost pinot-controller -exec We mount ./scripts on a purpose built container that will use schema and table config in order to create the table. You can view the table by navigating to Pinot Controller locally here and run your first query select domain, topic, user, ts from wikievents limit 10; Here is a sample of what you should expect Deviating from the sample Streamlit app provided by Startree, but similar in spirit we provide a Dashboard. Before delving into the code base let's clarify the business logic of the dashboard. We run a sampling query that works on a window from the sampling time, 1 minute back into the past. In this window we sample three important quantities: The number of changes that happened The different users that committed these changes The different domains where this change took place. Our dashboard will carry the current sample, and a window back in time of the 30 latest samples. For visualization we will will record the sample, and we will plot the 30 samples buffer as a visual summary. Our dashboard will be implemented with the Panel python package in a notebook. Is used VScodium for convenience. It is advised to create a virtual environment, install the dependencies there and then use it as a kernel for executing the notebook. How is the sample obtained is just an Apache Pinot query away: select count(*) AS events1Min, distinctcount(user) AS users1Min, distinctcount(domain) AS domains1Min from wikievents_REALTIME where ts > ago('PT1M') limit 1; ago function uses ISO 8601 duration format to construct a bound for the window. This is our main building block. To implement our sampling logic here is the relevant notebook cell from pinotdb import connect import pandas as pd conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http') list_of_samples = [] def get_changes(): query = """ select count(*) AS events1Min, distinctcount(user) AS users1Min, distinctcount(domain) AS domains1Min from wikievents_REALTIME where ts > ago('PT1M') limit 1; """ curs = conn.cursor() curs.execute(query) temp_df = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) temp_df['sample_time'] = pd.Timestamp.now() list_of_samples.append(temp_df) if len(list_of_samples) > 30: list_of_samples.pop(0) return temp_df.to_dict('records')[0], pd.concat(list_of_samples).sort_values(by=["sample_time"]) The sample is returned as a dict, while the past buffer is concatenated to a pandas data frame. A sample execution follows ({'events1Min': 2216, 'users1Min': 362, 'domains1Min': 80, 'sample_time': Timestamp('2026-05-12 12:58:12.996165')}, events1Min users1Min domains1Min sample_time 0 2216 362 80 2026-05-12 12:58:12.996165) The next cell sets up the reactivity of our data # Necessary for reactive pandas import panel as pn import hvplot.pandas pn.extension() sample_df, samples_df = get_changes() table_changes = pn.rx(sample_df) samples_df_rx = pn.rx(samples_df) ## Extract Data def update_table_changes(): sample_df, samples_df = get_changes() table_changes.rx.value = sample_df samples_df_rx.rx.value = samples_df pn.state.add_periodic_callback(update_table_changes, period=60000) See documentation of Panel library here. The most important statement is the last one that sets up 1 minute periodicity of updates for our feeds to plots. The next cells create a dashboard with the absolute defaults. No effort to tinker with CSS is taken. I will not spend time on the Panel components. The documentation is very thorough. What is remarkable though, is that you can directly serve the notebook with Panel. From you activated virtual environment run panel serve .\dashboard.ipynb and you can navigate to the appropriate url http://localhost:5006/dashboard to visit your dashboard In the above article we gave an example of an end-to-end dashboard backed by Apache Pinot streaming table. The original stream comes from an Apache Kafka topic. The stream captures the Wikipedia page edits and is customarily used for streaming tutorials. We gave a quick description of Apache Kafka and Apache Pinot setup, how to ingest the page edits and how to visualize them. RedPanda can be used instead of Apache Kafka. See the related Readme.md for the necessary, but minimal, changes. As always the code is provided. If you find something is not clear, a bug, or have any suggestion, do not hesitate to post on the comments. I hope you enjoyed it.