Pankaj Tanwar
Published on

Pagination with Cassandra - let’s deal with paging large queries in python 🐍

––– views

Pagination in Cassandra is one of the hair-pulling problems. Sometime back, I encountered a use case where I had to implement pagination in Cassandra. Most of us have grown up in a beautiful world, surrounded by typical relational databases. Like any other developer, I always had trust in my friends LIMIT , OFFSET and BETWEEN for implementing pagination. Howdy-hooo!

But my happiness didn't last long when I found, there is no such thing in Cassandra at all. With a heavy heart, I double-checked but no luck. LIMIT with OFFSET and BETWEEN parameters are not available in Cassandra query. For eg - I can get first 10 rows using LIMIT operator but not the next 10 rows as there is no OFFSET.

Even, a column in Cassandra is not the same as the column in RDBMS. It took a while for my mind to digest the way monster Cassandra works.

Well, I will walk you through my story of this really interesting war with Cassandra paging for large queries and journey to conquer it. (with actual code)

BONUS - you will learn, how smart is the Cassandra SELECT * implementation.

Problem Statement

Our use case was pretty simple. Pull everything from a Cassandra table, heavy personalized processing on each row, and keep going.

Before you declare me the dumbest person of the year, let me explain why it was not as simple to implement as it looks.


Initial approach

SELECT * FROM my_cute_cassandra_table;

This gives me back all rows over which I can iterate and get my job done. Easy-peasy right?

NO.

Let's first understand how smartly SELECT * query is implemented in Cassandra.

Suppose, you have a table with ~1B rows and you run -

SELECT * FROM my_cute_cassandra_table;

and store the result in a variable.

Hold on, Its not gonna eat all the RAM.

Loading all the 1B rows into the memory is painful and foolish. Unlike a silly approach, Cassandra does it in a very smart way with fetching data in pages. so you don't have to worry about the memory. It just fetches a chunk from the database (~ 5000 rows) and returns a cursor for results on which you can iterate, to see the rows.

Once our iteration reaches close to 5000, it again fetches the next chunk of 5000 rows internally and adds it to the result cursor. It does it so brilliantly that we don’t even feel this magic happening behind the scene.

query = "SELECT * FROM my_cute_cassandra_table;"
results = session.execute(query)
for data in results:
# processing the data
process_data_here(data)

I know, that’s a really smart approach but It became a bottleneck for us.

Whatever data we were fetching from Cassandra, we needed to put in some extensive processing over each payload which itself required some time.

As iterating over the chunk took some time and till it reached the end of the chunk, Cassandra thought the connection was not being used and closed the connection automatically yelling, its timeout.

As we had a lot of data to fetch and process, we needed a way to paginate the data in a smart way to streamline the issue.

How did I solve it?

We deep-dived into Cassandra configurations and found that whenever Cassandra returns a result cursor, it brings a page state with it. Page state is nothing but a page number to help Cassandra remember which chunk to fetch next.

from cassandra.query import SimpleStatement
query = "SELECT * FROM my_cute_cassandra_table;"
statement = SimpleStatement(query, fetch_size=100)
results = session.execute(statement)
# save page state
page_state = results.paging_state
for data in results:
process_data_here(data)

We changed our approach a bit in a tricky way. Based on our use case, we set the fetch size (it is the size of the chunk, manually given by us to Cassandra). And when we got the result cursor, we saved the page state in a variable.

We put a check on the counter. If it exceeds the manual chunk size, it breaks and again fetches a fresh new chunk with the page state already saved.

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
# connection with cassandra
cluster = Cluster(["127.0.0.1"], auth_provider=PlainTextAuthProvider(username="pankaj", password="pankaj"))
session = cluster.connect()
# setting keyspace
session.set_keyspace("my_keyspace")
# set fetch size
fetch_size = 100
# It will print first 100 records
next_page_available = True
paging_state     = None
data_count     = 0
while next_page_available is True:
# fetches a new chunk with given page state
result = fetch_a_fresh_chunk(paging_state)
paging_state = results.paging_state
for result in results:
# process payload here.....
# payload processed
data_count += 1
# once we reach fetch size, we stop cassandra to fetch more chunk, internally
if data_count == fetch_size:
i = 0
break
# fetches a fresh chunk with given page state
def fetch_a_fresh_chunk(paging_state = None)
query = "SELECT * FROM my_cute_cassandra_table;"
statement = SimpleStatement(query, fetch_size = fetch_size)
results = session.execute(statement, paging_state=paging_state)

It helped us restrict Cassandra to internally fetch new chunks to avoid connection timeout.

So now, that I end my ramblings, I hope you have learned something new. Correct me If you find any technical inaccuracies. If you have read till here, I guess you will like my other write ups too.