All posts by Suman_Karthik

GraceKelly: A best effort cache synchronization library for distributed systems

GracKelly is a best effort cache synchronization library for distributed systems.

In the following blog post I’ll explore, the motivations for such a library and give a brief  introduction to GraceKelly.

GraceKelly is open source and is available here: GraceKelly on Github

A Chaotic Place

The average visitor on Flipkart is not aware of the scale at which we operate. Day in and day out we handle millions of page views, thousands of transactions, millions of updates for many things including price, availability, offers, recommendations.  Under the hood of the calm, functional facade of the website, there is a complex network of services that are constantly interacting with each other.

We have services that handle everything ranging from search to product serviceability at a given pin code.

An arbitrary web request that hit’s one of our webservers can spawn a plethora of requests to a bunch of back-end services, which in turn might be dependent on other services.

The back-end services respond with different responses at a variable latency and the responses are collated, made sense of, transformed and finally a web response is created and served to the user as a web-page.
The variability of the requests and responses that traverse the complex network of services while being transformed, multiplexed, demultiplexed and altered makes for a chaotic environment.

Distributed Service Environment

Chaos means unpredictability and unpredictability is bad. When a user requests for a page his page load time must be predictable. When a product goes out of stock, the amount of time it takes to reflect on the product page needs to be predictable. We need predictability around service SLAs. Service SLAs are dependent on the load under which the service is operating. This means, we need predictability around service load as well. We can’t operate in an environment where one minute a single server is able to handle production traffic and the next minute a whole cluster is buckling under the load. So we try to grab and hold on to as much predictability as we can, where ever possible.

Caches to the rescue

Caches act as sentinels in a distributed service environment. Although their primary function is to reduce latency, when used appropriately they excel and bringing predictability to a system. This is because a cache request is extremely predictable, with almost no variability, either in response times or the load per request. This is down to the simple data access pattern for a cache request. If a given front-end request hits caches at all the back-end services we can predict with high confidence the load and response latency of the given request on each service.  One could say that there is positive co-relation between the percentage of Cache hits and the predictability of a system/environment.

Caches To The Rescue

Throwing a spanner in the works

Every time there is a cache miss both our distributed environment and it’s SLAs become a little bit more vulnerable. In the face of these risks a common pattern of cache usage seems inappropriate. One of the most common ways of updating the data stored in caches is to have an expiry ttl for every cache entry. Once this time to live expires the cache entry is removed from the cache and is no longer accessible, until another request repopulates/synchronizes the cache. Using an expiry ttl in this way exposes the underlying system to potentially harmful request pattern load for the duration of synchronization. Imagine a sequence of events like the following

  • t0 – a heavily requested cache entry c1 expires
  • t1 – there is a cache miss for c1 and a request is sent to the service to fulfill
  • t2 – the cache has been repopulated with c2

The time between t1 and t2 is the duration of exposure. During that time all requests for c1 that miss the cache are let through into the distributed environment. The predictability of the target service and all the services it depends on during this time is affected by the the per request load and the qps of all requests that result in a cache miss for c1. Caches could to be updated better than this.

Refresh don’t expire

Refreshing the cache without removing the cache entry solves the problem of exposure that cache expiry brings. In a cache refresh strategy once a value is cached, all requests for the value are served out of the cache and don’t hit the service/services at the back-end. Periodically the cache is synchronized with values from back-end services to keep the data up-to date and consistent with back-end systems. This means for all the values that are cached, the load on the back-end systems is extremely predictable. At the same time the response latencies are highly predictable for these cached values.

Many services/systems would be better served by refreshing the cache rather than expiring it. The efficacy of such a strategy depends on the kind of service in question. For services that have zero tolerance for stale data, best effort refreshing instead of expiring the cache entry doesn’t make sense. However, many services can tolerate stale data to a certain degree. For example, a stock availability service cannot accommodate stale data, while a review and rating service can still have stale data cached for a little while.

There are some popular strategies that are used to implement a refreshing cache.

  1. Global TTL, with a refreshing process: the most common way of implementing a refreshing cache is by running a separate process or thread that periodically refreshes all the entries in the cache. The shortcoming of this strategy is that, it is only appropriate where there is no variability in the staleness of data that is cached. eg: A search engine service’s cache can be refreshed once every 30 minutes if the re-indexing happens only once every 30 minutes.
  2. Fine grained TTL, with a monitoring & refreshing process: In this strategy, a separate process or thread is constantly monitoring the cache entries to see which of them have expired and refreshes them accordingly. This approach gives finer grained control on the cache refresh lifecycle for each cache entry. However, running a separate process means one more component in your environment that needs to be monitored and maintained.

What would be good to have is a cache library with regular caching semantics but one that accommodates refreshing a cache entry rather than expiring it based on ttl. This is exactly what GraceKelly is, it’s inspired by Gooogle Guava’s LoadingCache.

Cache me if you can

GraceKelly is a best effort cache synchronization library that tries it’s best to refresh any cache entry that has expired. The refresh lifecycle is solely request triggered and doesn’t monitor/maintain the cache. This means the refresh is not started when a cache entry expires but rather when the first request for an expired cache entry is made. It is best effort because if synchronization/refresh of a cache entry fails, it can fall back to the stale version of the data already present in the cache.

For every request

  • It looks up the cache and returns the value if a cache entry is present.
  • If the returned cache entry has expired it dispatches a task to refresh the cache entry.
  • If for some reason the refresh fails, it can extend the ttl of the existing entry or do nothing.

Note that a cache entry is never removed(though it can be evicted by size constraints). This enables us to

  • Shield the backend services and systems from exposure to unnecessary request load.
  • Decouple response SLAs from backend degradation and availability concerns, there by allowing for graceful degradation with stale data as fallback.

The Library

GraceKelly the library consists of a single Class Kelly that takes implementations of two different interfaces, a CacheProvider and a CacheLoader. They pass around a generic type CacheEntry.

  • Kelly: This is the core of the library that acts as a proxy to CacheProvider and is responsible for reloading the cache using the CacheLoader.
  • CacheProvider: Interface whose implementation provides the actual caching functionality. eg: a CacheProvider implementation for CouchBase, a CacheProvider wrapper around a ConcurrentHashMap.
  • CacheLoader: Interface whose implementation allows one to reload a CacheEntry based on key and value of the expiring CacheEntry.
  • CacheEntry: Generic type that contains key, value and ttl information.

GraceKelly is open source and is available here with example code and

documentation: GraceKelly on Github

Choosing a Datastore For The Flipkart User Engagement Platform

A User Engagement Platform

Tersely defined, a “User Engagement Platform” is a platform that solicits, accepts and displays user content and actions. These collections of content or actions are usually associated with both user and non user entities.

Such a platform enables engagement paradigms like reviews, user lists, ratings, votes, comments, karma… etc. It’s easy to imagine such a system being used by millions of users, who are constantly submitting and accessing varied and rich content.
This access pattern implies near real time updation of stats, ratings, votes, likes, plays, views… etc at scale. This usage creates loads of data, an abundance of which is volatile and ephemeral in nature.

In short, imagine a customer engagement platform for Flipkart, with about a million unique visitors per day, most of whom are actively engaging with the site and the community. We envision our user engagement platform handling such a load pattern with ease and ready to scale. One of the most critical, operational components in such a system is the data persistence layer. Ideally, this persistence layer should be highly performant, horizontally scalable with minimal overhead, consistent, always available and partition tolerant. But…..

CAP: Or why you can’t have your cake and eat it too

The CAP theorem, or Brewers theorem of distributed computing systems, states that it is impossible for a distributed system to simultaneously provide all three guarantees of

  1. Consistency: All nodes see the same data at the same time
  2. Availability: a guarantee that every request receives a response about whether it was successful or failed
  3. Partition tolerance: the system continues to operate despite arbitrary message loss or failure of part of the system.

The implications of the CAP theorem are crucial for selecting the persistence layer of an application or service. One has to carefully analyze their respective app/service requirements and pick the appropriate guarantees from CAP that their prospective persistence layer will support.

Data access patterns for User engagement services

Historically, the data access pattern for user generated content was low on writes and heavy on reads. However, modern social engagement patterns around user content have modified this pattern to being write heavy as well, to accommodate more likes, more ratings, more upvotes, more comments, more shares. So, today’s and tomorrow’s engagement services should accommodate, heavy write loads, heavy read loads, heavy aggregate(counter), modify and read loads. What becomes apparent if we look at user engagement services in this way is that aggregation needs to be a first class function of engagement services that is near real time, scalable and highly available.

Eventual consistency and User experience

At the same time we can also note that most of today’s engagement heavy applications tradeoff consistency for eventual consistency to achieve better scalability through horizontal partitioning and availability. The extent to which a congruent user experience can be pulled off with eventual consistency differs greatly. Reddit’s upvote user experience is a good example of using eventual consistency without it adversely affecting how the user perceives consistency on the platform.

Youtube’s “301 Views” for a video with “20000 likes” falls at the other end of the spectrum of good user experience using eventual consistency. So, with careful application and service design, effective tradeoffs on data consistency can be made without affecting the user experience. By doing this we immediately free ourselves from the “C” constraint of CAP, which leaves us free to explore the “Availability and Partition tolerance” guarantees which are very much desired in this context. The following section gives a brief example of the kinds of use cases that our engagement platform should support and what they imply for the persistence layer.

A playlist of the people

Imagine a community created playlist on Flipkart’s Flyte. This is a playlist where people add songs to a global playlist and then upvote or downvote songs added by other users. The users should have an always on experience and neither their submissions nor votes should be missed. The implication here is that there shouldn’t be a single point of failure in the write path. Hundreds/thousands of users could be simultaneously upvoting/downvoting the same song so locking of counters should be avoided and Distributed Counters should be preferred. Not every user can see the same view of the data as the nature of the data is very transient to begin with, so eventual consistency should do fine. Given the massive amount of user engagement, the sort order of the playlist is going to change very often so one should avoid query time sorting and prefer data that is natively sorted. Adequate clarity around such usage scenarios enabled us to confidently transition from requirements assessment to technology selection.

Technology Selection: What we want from a persistence layer

Let us consider the attributes of an appropriate persistence layer for engagement services.

  • Availability and Partition tolerance, with tunable consistency: as discussed above we should be able to trade off on consistency to accommodate highly available and partition tolerant engagement services.
  • Linear Scalability: In the face of massive amounts of content being created by  users the system should be able to scale without degrading performance.
  • Operational Efficiency: The operational requirements of the highly available, distributed, partition tolerant persistence layer should be minimal.
  • Community Support: There should be a thriving community of users and developers that is both effective and helpful.
  • Parsable Code Base: The code base should be gorkable both in size and complexity, with either good documentation or help from the community.
  • Professional Support: It is preferable to have companies that are providing professional support for the platform.

Though this isn’t an exhaustive list, it’s a good starting point to explore different alternatives. However, there are also functional requirements of the persistence layer that must be considered.

  • Aggregators are a first class concern: aggregator modification is potentially the most heavy write load element of an engagement service. So the persistence layer should support highly performant aggregator modification over a distributed infrastructure.
  • Sorting is a first class concern: Most user generated content is going to be in a sorted form, ranging from most recent comments(like news feed) to sort by helpfulness. Sorting large amounts of data should be handled in a highly efficient manner.
  • Multiple pivot points for data elements: each complex data entity should be accessible through its attributes through a reverse index or filtering.
  • Offline stats with map reduce: the persistence layer should support map reduce on data natively or should be able to easily be plugged into a map reduce framework like hadoop.
  • Search integration: text search should either be native or easily pluggable into the persistence layer.
  • Selectable/Updatable individual attributes: Attributes of a data entity should be individually selectable and updatable.
  • Schema Less: The data model should be flexible and should not impose any constraints on what and how much data is stored. A schema less data store like columnar or key-value data stores provide great data model flexibility.
  • Native support for ephemeral data: engagement services are going to generate lots of data of an ephemeral nature, i.e data that is important/valid only for a short period of time. Ephemeral data should not clog up the system.
  • Replication should be first class concern: replication, replication awareness should be deeply integrated into the design and implementation of the database.

Cassandra

Considering all the above factors we ruled out traditional RDBMSs and ventured into the wild west of databases aka “NoSQL”. After evaluating and eliminating a bunch databases (Document stores, key value stores, Riak – attributes not selectable/updatable, HBASE – catered to consistency and partitioning….) we arrived at Cassandra. Cassandra purportedly supports many of the above mentioned functional and non-functional requirements.

The Good

  • Online load balancing and cluster growth: Cassandra is designed from the ground up to be deployed on a cluster. Growing and shrinking clusters is transparent and can deal gracefully with node loss in a cluster.
  • Flexible Schema: Cassandra is a column oriented database and there is inherent flexibility in the schema for the data.
  • Key Oriented Queries: All queries are key,value oriented making the database highly performant if appropriately configured.
  • CAP aware: Cassandra is CAP aware, and allows one to make tradeoffs between consistency and latency (Consistency and Partition tolerance). Consistency can be configured for different levels. One can also tune consistency at a per query level.
  • No SPF: No single point of failure. Cassandra can be configured to be incredibly resilient in the face of massive node loss in the cluster. This is because replication is a first class function of Cassandra.
  • DC and rackaware: Cassandra was built to be datacenter and rack aware and can be configured appropriately to use different strategies to achieve either better DR(disaster recovery) resiliency or minimal latency.
  • Row and Key Caching: row level and key level caching is baked into Cassandra. This makes Cassandra a viable persistent cache.

The Bad

  • Limited Adhoc Querying Capability: due to the absence of a query language as comprehensive as SQL, adhoc querying are severely limited on a Cassandra cluster.
  • Tight coupling of data model: the way in which Cassandra data models are created are heavily dependent on their access patterns from the application layer. In contrast RDBMS systems model data based on entities and their relationships. Hence, the Cassandra data model is tightly coupled with application access patterns. This means any app level features, changes will impact the data model to a large degree.
  • No Codebase stability: Cassandra is still rapidly evolving and the codebase and feature set change constantly.
  • Bad documentation: the documentation is very sparse and outdated/defunct due to the rapid pace of change.
  • Lack of transactions: Cassandra does not have a mechanism for rolling back writes. Since it values AP out of CAP there is no built in transaction support.

The Ugly

  • Steep learning curve: the steep learning curve combined with having to sift through the internet to find appropriate documentation makes Cassandra very hard to get a handle on.
  • No Operational experience: Zero or low operational experience across the organization with Cassandra when compared with RDBMS systems.
  • Here be dragons: One of the scary prospects of going the Cassandra way is fear of the unknown. If something goes wrong, how will we be able to debug, do an RCA in a timely manner without any tooling/experience and fix the issue.

Appendix I

Cassandra Riak HBase
CAP AP AP CP
Language Java Erlang, C, Javascript Java
Type Column oriented Key-Value Column Oriented
Protocol Thrift / custom REST/Custom REST/Thrift
Tuneable tradeoffs
for distribution and
replication
Yes (N,R,W) Yes (N,R,W) No
Map/Reduce Through Hadoop Built In Through Hadoop
Distributed Counters Yes No Yes
Built in Sorting Yes No (map reduce) Yes