Pagination in Cassandra is one of the hair-pulling problems. Sometime back, I encountered a use case where I had to implement pagination in Cassandra. Most of us have grown up in a beautiful world, surrounded by typical relational databases. Like any other developer, I always had trust in my friends
BETWEEN for implementing pagination. Howdy-hooo!
But my happiness didn't last long when I found, there is no such thing in Cassandra at all. With a heavy heart, I double-checked but no luck.
BETWEEN parameters are not available in Cassandra query. For eg - I can get first 10 rows using
LIMIT operator but not the next 10 rows as there is no
Even, a column in Cassandra is not the same as the column in RDBMS. It took a while for my mind to digest the way monster Cassandra works.
Well, I will walk you through my story of this really interesting war with Cassandra paging for large queries and journey to conquer it. (with actual code)
BONUS - you will learn, how smart is the Cassandra
Our use case was pretty simple. Pull everything from a Cassandra table, heavy personalized processing on each row, and keep going.
Before you declare me
the dumbest person of the year, let me explain why it was not as simple to implement as it looks.
SELECT * FROM my_cute_cassandra_table;
This gives me back all rows over which I can iterate and get my job done. Easy-peasy right?
Let's first understand how smartly
SELECT * query is implemented in Cassandra.
Suppose, you have a table with ~1B rows and you run -
SELECT * FROM my_cute_cassandra_table;
and store the result in a variable.
Hold on, Its not gonna eat all the RAM.
Loading all the 1B rows into the memory is painful and foolish. Unlike a silly approach, Cassandra does it in a very smart way with fetching data in pages. so you don't have to worry about the memory. It just fetches a chunk from the database (~ 5000 rows) and returns a cursor for results on which you can iterate, to see the rows.
Once our iteration reaches close to 5000, it again fetches the next chunk of 5000 rows internally and adds it to the result cursor. It does it so brilliantly that we don’t even feel this magic happening behind the scene.
query = "SELECT * FROM my_cute_cassandra_table;"results = session.execute(query)for data in results:# processing the dataprocess_data_here(data)
I know, that’s a really smart approach but It became a bottleneck for us.
Whatever data we were fetching from Cassandra, we needed to put in some extensive processing over each payload which itself required some time.
As iterating over the chunk took some time and till it reached the end of the chunk, Cassandra thought the connection was not being used and closed the connection automatically yelling, its timeout.
As we had a lot of data to fetch and process, we needed a way to paginate the data in a smart way to streamline the issue.
How did I solve it?
We deep-dived into Cassandra configurations and found that whenever Cassandra returns a result cursor, it brings a page state with it. Page state is nothing but a page number to help Cassandra remember which chunk to fetch next.
from cassandra.query import SimpleStatementquery = "SELECT * FROM my_cute_cassandra_table;"statement = SimpleStatement(query, fetch_size=100)results = session.execute(statement)# save page statepage_state = results.paging_statefor data in results:process_data_here(data)
We changed our approach a bit in a tricky way. Based on our use case, we set the fetch size (it is the size of the chunk, manually given by us to Cassandra). And when we got the result cursor, we saved the page state in a variable.
We put a check on the counter. If it exceeds the manual chunk size, it breaks and again fetches a fresh new chunk with the page state already saved.
from cassandra.cluster import Clusterfrom cassandra.auth import PlainTextAuthProviderfrom cassandra.query import SimpleStatement# connection with cassandracluster = Cluster(["127.0.0.1"], auth_provider=PlainTextAuthProvider(username="pankaj", password="pankaj"))session = cluster.connect()# setting keyspacesession.set_keyspace("my_keyspace")# set fetch sizefetch_size = 100# It will print first 100 recordsnext_page_available = Truepaging_state = Nonedata_count = 0while next_page_available is True:# fetches a new chunk with given page stateresult = fetch_a_fresh_chunk(paging_state)paging_state = results.paging_statefor result in results:# process payload here.....# payload processeddata_count += 1# once we reach fetch size, we stop cassandra to fetch more chunk, internallyif data_count == fetch_size:i = 0break# fetches a fresh chunk with given page statedef fetch_a_fresh_chunk(paging_state = None)query = "SELECT * FROM my_cute_cassandra_table;"statement = SimpleStatement(query, fetch_size = fetch_size)results = session.execute(statement, paging_state=paging_state)
It helped us restrict Cassandra to internally fetch new chunks to avoid connection timeout.
So now, that I end my ramblings, I hope you have learned something new. Correct me If you find any technical inaccuracies. If you have read till here, I guess you will like my other write ups too.