Is Data Locality Always Out Of The Box in Hadoop ? Not Really !

Introduction

Hadoop optimizes on Data Locality: Moving compute to data is cheaper than moving data to compute. It can schedule jobs to nodes that are local for input stream, which results in high performance. But, is this really out of the box ? Not always !  This blog explains couple of data locality issues that we identified and fixed.

User Insights Team

Flipkart collects terabytes of user data like browse, search, orders etc. User Insights Team uses these signals and derives useful insights about the user. These can be broadly classified as one of the following

  • Direct Insights Directly derived from user data. e.g. Location,
  • Descriptive Insights Describes the user, his preferences, likes/dislikes, habits, etc. e.g. Brand Affinities, Product Category Affinities, Life-stage
  • Predictive Insights Predict the next behavior or event of a user. e.g. Probability of purchase in session, cancellation/return.

These insights are used to personalize the user experience, give better recommendations and target relevant advertisements.

User Insights
User Insights

HBase as the data store for User Insights

The data collected is stored distributively and processed on Hadoop Cluster. HBase is used for fast lookup of user signals during generation of insights.

Multiple mappers hitting the same HBase region server

At times, full HBase table scans are required. During these scans some mappers finish in less than 10 minutes where as many take hours to finish for approximately same size of HBase regions. We picked the regions which were taking hours to finish and started running them one by one, to our surprise they also finished in under 10 minutes. But when ran with some other regions on the same region server they were slowing down. We started profiling the nodes where the jobs were running very slow and saw that CPU, Memory were fine, but network was choking. We used network profiling tools like nload and iftop to look at the network usage. This showed us that the outbound network traffic was maxing on the bandwidth of the network card and slowing down outbound network transfers. iftop showed us that most of the outbound traffic was from the HBase region server process.

We suspected that the way the input data was split and maps were launched caused too many mappers to hit the same region server. Digging into HBase code which creates the TableInputFormat having TableSplits for HBase scan jobs, found that if the scan range has consecutive regions belonging to the same region server, it will return them in a sequential order and mappers will be launched and allocated in the same order. This will cause all the mappers to hit the same region server and choke the network bandwidth. We wrote the below RoundRobinTableInputFormat which tries to order the splits in a round robin way for each of the used region servers, thereby reducing the probability of the having lot of mappers hitting the same region server.

Code
Gist Link: https://gist.github.com/sudhir-reddy/4eb430d4be9f9f404c38

YARN Fair Scheduler and Data Locality

Even when the cluster was free, we observed that we had 5-10% of data locality for a job. Something seemed suspicious and we started looking at how data locality happens in YARN. Client submits job to YARN Resource Manager(RM), RM allocates a container for running the Application Master(AM) on one of the cluster node. Data to be processed is split into InputSplit for each mapper task. InputSplit has information on where the data is located for the split task. AM asks RM to provide containers for running the mapper tasks preferably on the nodes where the data is located. RM uses the scheduler config to figure out which level of data locality has to be offered to the container request. RM allocates Containers to run on Node Manager(NM) and AM executes the mapper tasks in the allocated containers.

YARN Resource Manager
YARN Resource Manager

We are using FairScheduler in our YARN setup. FairScheduler has configs yarn.scheduler.fair.locality.threshold.node and yarn.scheduler.fair.locality.threshold.rack to tweak data locality. On setting these values to max data locality(=1), we were not able to observe any change in data locality tasks. Digging deeper in to the scheduler code, we found that these values will have no effect without setting the undocumented configs: yarn.scheduler.fair.locality-delay-node-ms and yarn.scheduler.fair.locality-delay-rack-ms. Even on setting these values, not even a single mapper was going to the data local node. Looking further into the scheduler code and container assignment code, figured out that the data local requests will be allocated if the available YARN Node Manager node name and Application Master requesting container for data on a given node name matches. Looking at the YARN node manager console saw that hosts were added with non-FQDN(Fully Qualified Domain Name, eg: hadoop01) and HBase region servers were returning data locality region server names with FQDN(hadoop01.something.flipkart.com). These hosts have non FQDN names in /etc/hostname and kernel(can be seen by `hostname` command) . After updating the hostname to FQDN and running the job again, we were able to get around 97.6% data locality and job’s runtime got reduced by 37% ! Looking for filing a bug on YARN, found that there is one bug already filed and fixed in version YARN 2.1.0. If anyone is using version of YARN less than 2.1.0, should set the hostname to FQDN for getting better data locality.

The above were two instances that cautions us about data locality assumptions in Hadoop and is worth addressing by teams that run large Hadoop based MR workloads.

Foxtrot – Event Analytics at Scale

Health and monitoring of actors participating in a SOA based distributed system like the one we have at Flipkart is critical for production stability. When things go south (and they do), not only do we need to know about the occurrence instantly, but get down to exactly when  and where the problem has happened. This points to the need for us to get access to and act on aggregated metrics as well as raw events that can be sliced and diced at will. With server side apps spanning hundreds of virtual machines and client side apps running on millions of diverse phones this gets amplified. Individual requests need to be tracked across systems, and issues found and reported much before the customers get impacted.

Introducing Foxtrot

Foxtrot is a system built to ingest, store and analyze rich events from multiple systems, providing aggregated metrics as well as the ability to slice, dice and drill down into raw events.

Foxtrot in action

Foxtrot provides a configurable console to get a view of systems; enables people to dig into as well as aggregate raw events across hosts and/or client devices; and provides a familiar SQL based language to interact with the system both from the console as well as from the command line.

A case for event analytics at scale

Let’s consider a few real-life situations:
  • Each day, hundreds of thousands of people place orders at Flipkart through the Checkout system. The system interacts with a bunch of downstream services to make this possible. Any of these services failing in such situations degrade the buying experience considerably. We strive to be on top of such situations and proactively monitor service call rates and response times and quickly stop, debug and re-enable access to the degraded services.
  • Mobile apps need to be faster and efficient both in terms of memory and power. Interestingly, we have very little insight into what is actually happening on the customer’s device as they are interacting with the apps. Getting proper data about how our app affects these devices helps us make our app, faster lighter and more power efficient.
  • Push notifications bring important snippets of information and great offers to millions of users of our apps. It’s of critical importance for us to know what happens to these messages, and control the quality and frequency of these notifications to users.
  • Book publishers push data like book names, authors to us. We process and publish this data to be shown on the apps and the website. We need to monitor these processing systems to ensure things are working properly and quickly catch and fix any problems.

There and many more use-cases currently being satisfied by Foxtrot at Flipkart and Olacabs.

Basic Abstractions

Foxtrot works on the following very simple abstractions:

  • Table – The basic multi-tenancy container used to logically separate data coming in from different systems. So your Foxtrot system might have tables for website, checkout, oms, apps etc. A table in Foxtrot has a TTL and the data does not remain queriable from FQL, Console and JSON queries (see below) once the TTL expires. Data for the table is saved in Hbase and can be used independently by using M/R jobs on hadoop depending on the expiry time specified for the column-family in the table.
  • Document - Most granular data/event representation in Foxtrot. A document is composed of:
    • id – A unique id for the document. This must be provided.
    • timestamp – A timestamp representing the time of generation of this event.
    • data – A json node with fields representing the metadata for the event.
//A sample document
{
    "id": "569b3782-255b-48d7-a53f-7897d605db0b",
   "timestamp": 1401650819000,
    "data": {
        "event": "APP_LOAD",
        "os": "android",
        "device": "XperiaZ",
        "appVersion": {
            "major": "1",
            "minor": "2"
        }
    }
}

Usage

Foxtrot has been designed grounds up for simplicity of use. The following sections detail how events can be pushed to Foxtrot and how they can be summarized, viewed and downloaded.

Pushing data to Foxtrot

Pushing data involves POSTing documents to the single or bulk ingestion API. For more details on ingestion APIs please consult the wiki.

Accessing data from Foxtrot

Once events are ingested into Foxtrot, they can be accessed through any of the many interfaces available.

Foxtrot Console (Builder)

Foxtrot in action

Foxtrot provides a console system with configurable widgets that can be backed by JSON queries. Once configured and saved, these consoles can be shared by url and accessed from the consoles dropdown.

Currently, the following widgets are supported:

  • Pie charts - Show comparative counts of different types for a given time period
  • Bar charts - Show comparative counts of different types for a given time period
  • Histogram - Show count of events bucketed by mins/hours/days over time period
  • Classified histogram - Show count of multiple type of events bucketed by mins/hours/days over time period
  • Numeric Stats histogram - Show stats like max, min, avg and percentiles over numeric fields  bucketed by mins/hours/days over time period
  • Tables - Auto-refreshing tabular data display based on FQL query (see below)

Each of these widgets come with customizable parameters like:

  • Time window of query
  • Filters
  • Whether to show a legend or not etc.

Foxtrot Query Language

A big user of these kinds  of systems  is our analysts, and they really love SQL. We, therefore, support a subset of SQL, that we call as FQL (obviously!!).

Example: The find the count of all app loads and app crashes in the last one hour grouped by operating system:

select * from test where eventType in ('APP_LOAD', 'APP_CRASH') and last('1h') group by eventType, os

Results:
+-------------------------------+
| eventType | os      |   count |
+-------------------------------+
| APP_CRASH | android |   38330 |
| APP_CRASH | ios     |    2888 |
| APP_LOAD  | android | 2749803 |
| APP_LOAD  | ios     |   35380 |
+-------------------------------+

FQL queries can be run from the console as well by accessing the “FQL” tab.

Details about FQL can be found in the wiki.

CSV Downloads

Data can be downloaded from the system by POSTing a FQL query to the /fql/download endpoint.

Json Analytics Interface

This is the simplest and fundamental mode of access of data stored in Foxtrot. Foxtrot provides the /analytics endpoint that can be hit with simple json queries to access and analyze the data that has been ingested into Foxtrot.

Example: The find the count of all app loads crashes in the last one hour grouped by operating system and version:

//Request
{
    "opcode": "group",
    "table": "test",
    "filters": [
        {
            "field": "event",
            "operator": "in",
            "value": "APP_LOAD"
        },
        {
            "operator" : "last",
            "duration" : "1h"
        }
    ],
    "nesting": [
        "os",
        "version"
    ]
}
//Response
{
    "opcode": "group",
    "result": {
        "android": {
            "3.2": 2019,
            "4.3": 299430,
            "4.4": 100000032023
        },
        "ios": {
            "6": 12281,
            "7": 23383773637
        }
    }
}

A list of all analytics and their usage can be found in the wiki.

Raw access

The raw data is available on HBase for access from map-reduce jobs.

Technical Background

Foxtrot was designed and built for scale, with optimizations in both code and query paths to give the fastest possible experience to the user.

Requirements

We set out to build Foxtrot with the following basic requirements:
  • Rapid drill down into events with segmentation on arbitrary fields spanning over a reasonable number of days – This is a big one. Basically that, analysts and users will be able to slice and dice the data based on arbitrary fields in the data. This facility will be provided over data for a reasonable amount of time.
  • Fast ingestion into the system. We absolutely cannot slow down the client system due to it’s wanting to push data to us.
  • Metrics on systems and how they are performing in near real-time. Derived from above, basically aggregations over the events based on arbitrary criteria.
  • Multi-tenancy of clients pushing data to it. Clean abstractions regarding multi-tenancy right down to the storage layers both for query and raw stores, so that multiple-teams can push and view data without them having to worry about stability and/or their data getting corrupted by event stream from other systems
  • Consoles that are easy to build and  share. We needed to have a re-configurable console that teams could customize and share with each other. The console should not buckle under the pressure of millions of events getting ingested and many people having such consoles open to view real-time data on the same.
  • SQL interface to run queries on the data. Our analysts requested us to provide a SQL-ish interface to the system so that they can easily access and analyze the data without getting into the JSON DSL voodoo.
  • CSV downloads of recent data. This, again, was a request from analysts, as they wanted access to the raw events and aggregated results as CSV.
  • Extensibility of the system. We understood that teams will keep on requesting newer analytics and we would not be able to ship everything together at start. We had to plan to support quick development of newer analytics functions without having to touch a lot of the code.

Technology Used

Foxtrot unifies and abstracts a bunch of complex and proven technologies to bring a simple user experience in terms of event aggregation and metrics. Foxtrot uses the following tech stack:

  • HBase – Used as a simple key value store and saves data to be served out for queries and for usage as raw store in long-term map-reduce batch jobs.
  • Elasticsearch – Used as the primary query index. All fields of an event are indexed for the stipulated time and TTLs out. It does not store any document, but row keys only. We provide an optimized mapping in the source tree to the max performance out of elasticsearch.
  • Hazelcast – Used as a distributed caching layer till now, and caches query results for 30 secs. Cache keys are time-dependent and changes with the passage of time. Older data gets TTLd out.
  • SQL parser – For parsing FQL and converting them into Foxtrot JSON queries
  • Bootstrap, Jquery, HTML5, CSS3 –  Used to build the console

Architecture

The Foxtrot system uses a bunch of battle tested technologies to provide a robust and scalable system that meets the various requirements for  teams and allows for development for more types of analytics function on them.

Foxtrot Architecture

  • During ingestion, data is written to Elasticsearch quorum and HBase tables. Writes succeed only when events are written to both stores. HBase is written to before Elasticsearch. Data written to HBase will not become discoverable till it is indexed in Elasticsearch.
  • During query, the following steps are performed:
    • Translate FQL to Foxtrot JSON/native query format
    • Create cache key from the query.
    • Return results if found for this cache key.
    • Otherwise, figure out the time window for the analytics
    • Translate Foxtrot filters to elasticsearch filters
    • Execute the query on Elasticsearch and HBase and cache the results on Hazelcast if the action is cacheable and of reasonable size.

Misc

  • We try to avoid any heavy lifting during document ingestion. We recommend and internally use the /bulk API for event ingestion.
  • We have built in support for discovery of Foxtrot nodes using the /cluster/members API. We use this to distribute the indexing requests onto these different hosts, rather than sending data through a single load-balancer endpoint. We have a Java Client Library that can be embedded and used in client applications for this purpose. The Java client supports both on disk-queue based and async/direct senders for events.
  • We optimize the queries for every analytics by controlling the type of elasticsearch queries and filters being generated. Newer analytics can be easily added to the system by implementing the necessary abstract classes and with proper annotation. The server scans the classpath to pick up newer annotations, so it’s not mandatory for the analytics to be a part of the Foxtrot source tree.
  • In the runtime, the time window is detected and queries routed to indexes that are relevant to the query period. For efficiency reasons always use a time based filter in your queries. Most analytics will automatically add a filter to limit the query. Details for this can be found in individual analytics wiki pages.
  • We have added a simple console (Cluster tab of the console) with relevant parameters that can be used to monitor the health of the elasticsearch cluster.

Conclusion

Foxtrot has been in production for the better part of the year now at Flipkart and has ingested and provided analytics over hundreds of millions of events a day ranging over terabytes of data. Quite a few of our teams depend on this system for their application level metrics, as well as debugging and tracing  requirements.
Foxtrot is being released with Apache 2 License on github.
The source, wiki and issue tracker for Foxtrot is available at: Foxtrot github repository.
We look forward to you taking a look at the project and actively use and contribute to it. Please drop us a note if you find Foxtrot useful. Use github issues to file issues, feature requests. Use pull requests to send us code.
Special thanks to the Mobile API, checkout and notifications team for the support. Many thanks to Regu (@RegunathB) for providing some very important and much needed feedback on the usability of the system and the documentation as a whole and for guiding us meticulously through the process of open-sourcing this.

Aesop Change Propagation System

We have open sourced Aesop, the Flipkart Change Propagation system following the announcement of intent at slash n – the Flipkart tech conference. This post describes use of Aesop in one of our production systems.

PayZippy

PayZippy is an online payment solution launched by Flipkart. It is a Safe & Easy payment solution by Flipkart for e-commerce users and online merchants.

Need for change event propagation

It is fairly common for products and services to use Relational Databases to store business critical data. These data stores therefore become source of truth for such data.
However Relational Databases may not scale well for all kinds of use cases. Use cases include analytics, reporting, search indexing, etc that need this data in secondary data stores. Some of these secondary data stores are non-relational and are efficient at handling such use cases.There is then, a need, for a system to transfer data from the primary data store to these secondary data stores.
A number of tools and products in the ETL (Extract, Transform, and Load) space may be used to transfer data. However these are batched and do not transfer data in real time.
At PayZippy, data from these secondary stores is used to feed more than just business decisions. The data from these secondary data stores feeds into Real-Time use cases like Console, Fraud Detection and Monitoring Systems.
We therefore needed a system that can transfer data across data stores in real time.

Introducing Aesop

Aesop is a keen observer of changes that can also relay change events reliably to interested parties. It provides useful infrastructure for building Eventually Consistent data sources and systems.

Overall Architecture


The main components are the following :
Relay Server

  • Reads changes from Source Data Sources. Converts the changes to a Serializable form by using the schema registry. Stores the events in an internal buffer.
  • Listens to requests from Clients and transport events to the clients. Provides clients with only those events pertaining to sources and partitions for which the client has registered.

Relay Client

  • Single Client : It calls Relay Server to check for new events. On receipt of events from server it executes business logic – for e.g. writing to a destination data store. It checkpoints the SCN. If it falls off the relay, it connect to bootstrap for events. It reconnects back to Relay once it catches up.
  • Participant in Cluster Client : It calls Relay Server for events only for partitions assigned to the client. It process Cluster Status change events from Helix and acts accordingly. It checkpoints SCN to shared storage.

Bootstrap

  • Bootstrap Producer : Similar to a Client. It checks for new data change events on relays and stores those events in a MySQL database. The MySQL database is used for bootstrap and catchup for clients.
  • Bootstrap Server : Similar to Relay Server. It listens for requests from Databus Clients and returns long look-back data change events for bootstrapping and catch up.

High Availability, Load Balancing and Scaling

High Availability, Load Balancing and Scaling is built into Aesop. Since PayZippy is relying on the data moved via Aesop for Real time use cases these were essential to the strategy.

Client Clustering

  • High Availability and Load Balancing on the clients is achieved by having partitioned clients participate in a cluster with each client handling a partition (part of the data set). The partitions are dynamically assigned via Helix. The default partition function partition function used to filter the events is MOD.
  • Partition reassignment when instances join or leave the cluster is automatic and the load (number of partitions assigned) is uniform across instances.
  • The filtering itself is performed at the Relay and Bootstrap. Checkpoint for partitions is stored in Zookeeper. Uses Partitioning functionality available in Linkedin databus.

Relay – HA

High Availability of Relay Server is achieved using one of the following approaches :
Multiple Relay Servers read from the Source Data Sources.

  • The Clients connect to Relay Server via a load balancer.
  • Since the requests from clients are over HTTP one of the Relay Servers or both can be serving the request based on the configuration in the load balancer.
  • When one Relay goes down, the other can still handle requests.

Relay Chaining with HA using Leader Follower model (Not yet implemented)

  • Relay Producer reads from another Relay. This Relay can act as normal Relay for Clients.
  • Relays have both producer, Source Producer and Relay Producers, but only one is active at a time. On Leader Relay the source producer is active. On follower relay the relay producer is active.
  • Leader/Follower election is done using Helix.

Refer Aesop Github Wiki for more information on Aesop architecture and design.

Aesop at PayZippy

Blocking Startup Bootstrap

We needed to transfer existing data from MySql to destination data stores. We assumed we could use the same Relay Server and Client mechanism by pointing the Relay to the first bin log.
However we faced the following issues:

  • The throughput of the client was dependant on that of the destination data store. This turned out to be far less than the Relay. The Relay was twice as fast as the client.
  • The clients would fall off the Relay. The buffer in the Relay does not help as the Relay throughput  is far higher than that of Client and hence the buffer fills up and starts overwriting even before the client has been able to pull the set of events.

Solution : Blocking Startup Bootstrap

  • Thin Producer : A producer similar to producer in Relay that can pull events from the Source or the events are pushed to it from the Source. Processing related to serialization is skipped.
  • Partitioned Consumers : Consumers consume only the particular partition in order and execute business logic. In this case it inserts into destination data store.

Consumers

The consumer currently supported in PayZippy writes to a destination data source – for e.g. denormalized MySQL or HBase. The consumer calls Event Mapper and Transformer to get an Event pertaining to the schema of the destination data source. The transformed event is then written to Destination Data Source.

Monitoring and Alerting

Monitoring and alerting was important as the change propagation system is being used for Real time use cases. It is supported using:

  • JMX Monitor (Soon to be Open Sourced) connects to JVM’s running Aesop. Fetches metrics and publishes to StatsD/Graphite. It also raises alerts based on configured thresholds.
  • Skyline (Changes by Flipkart are not yet Open Sourced) is used to raise alerts based on algorithm configured.
  • Aesop Dashboard – The dashboard provides a real-time view of the change propagation pipeline

Performance

We ran a few benchmarks for Aesop on standard 4 core, 16GB virtual machines running 64-bit Debian Linux. The Aesop components were run on separate VM instances. Performance numbers from the benchmarks:

Relay Server : Handles 19 million events an hour per data source. 18.2 GB per hour.
Relay Client : Can process data at same speed as Server. However it is limited by the capability or speed of end database. For MySql Client we are able to insert/update events at the rate of 10 million events an hour.
Latency : The latency between source data source and destination data source is within 80-90 milliseconds.

Appendix :  Design Decisions

Dual Writes v/s Log Mining

Of the various ways of having data in multiple stores, two were evident.
Dual Writes : Application writes to destination data stores, synchronously or asynchronously. Application can also write to a Publisher-Subscriber system in which the Subscribers are consumers that eventually write to Destination Data stores

  • Pros : Appears Easy : Application can publish the same event that is being inserted/updated in the Primary Data Source.
  • Cons : Difficult to maintain consistency
    • Updates with non-primary-key where clause. For cases where there are bulk updates (Updates with non-primary-key where clause), the application would have to then have to fetch affected records and publish the changes.
    • Difficult to ensure all applications use the common data layer which publishes changes as well.
    • Manual changes in Primary Data Store will be missed.

Log Mining : Separate application/service can extract changes from Database commit logs and publish them. This would use the same approach used by database for replication.

  • Pros : Consistency can be guaranteed as changes are being read from commit logs (bin log in case of MySql).
  • Cons
    • Appears tough – But definitely possible.
    • Tied to mechanism used by database for replication. Tied to commit log format, etc … Tightly coupled approach.

Since Consistency across Datastores is of paramount importance to a financial system like PayZippy we chose the Log Mining approach.

Approaches to Log Mining – Bin Log Parser vs Storage Engine

MySql Bin Log Parsing

  • Pros : Familiar approach
    • Open source softwares were available that parsed MySql bin logs. Open Replicator and Tungsten Replicator
  • Cons
    • If format of bin logs changes the parser would have to change.
    • Open Replicator was supporting MySql version 5.5. We would have to modify Open Replicator to support MySql v5.6 and checksum feature introduced in v5.6.

Custom Storage Engine

  • Pros : Independent of binlog format. Layers above Storage Engine take care of parsing.
  • Cons : Unfamiliar approach. Unknown pitfalls.

We decided to go with known pitfalls and picked Bin Log Parsing approach.

Palette Image Generation – Technology to the Rescue

In this blog post, I’ll describe a new approach to palette image generation for products sold on Flipkart.

Palette images on Flipkart.com

Palette images are the small colored boxes that represent the color of a product. You can see these in small groups below the product image on a browse page or right of the product image on a product page that look like these

The shades of the color in these images are the colors from the actual product.

For verticals (product groups) like apparels, shoes, bags, etc. we need to inform the customer that alternate colors are available for the particular product. Palette images are an efficient way of delivering this information visually. Instead of reading an alternate product color as Fushcia and wonder what color it could be, the palette shows it as a shade of purple.

Problem Statement

Given a image for a product on Flipkart, we should be able to generate the palette image for it automatically. This leads to a question – how was it being done and is automating the process even required?

So, how did it work so far?

It was being handled manually. We have a console where the product image is displayed and one has to select to a tiny section of the image that would act as a palette. We capture the coordinates of the selection, product ID and image url and and put this information in a queue. The queue consumer would then crop the required section from the image and process it.

The need for automation?

The effort of generating palette images manually increases linearly with the number of products, hence it’s not scalable. On a given day we have about 2500 different products being shot (which converts to around 4k images). Generating the palettes manually from the UI takes significant time.

To top that, we now get images from marketplace sellers. None of them have palette images and we need to generate the palettes for them as well.

Given that lifestyle products are seasonal these appear in short unexpected bursts, making planning for this activity difficult.

How can this be automated?

We brainstormed through several existing techniques and why they didn’t fit the bill.

  • There are a quite a few algorithms which can be used to generate a color palette out of an image, most of them involve some form of color quantization. This paper describes one such way. What these would give you us is something like this .

  

All the quantization-based algorithms generated palettes of the entire image. As it turned out, the background color dominated the palettes in most cases. But, we are not interested in the palette of the entire image, only the product inside the image!

  • That seemed like a solvable problem. Remove the background from the image and then use the technique from step 1. This too did not work out.                                Even if the background is removed, we are still left with an image that contains the model. Also a product image will usually have the model wearing some other kind of clothing to give a better sense of the product. Now, we not only have to recognise and remove the hair, eyes and skin tone from the image but also need to recognise the product from other products in the image!                                                           For example, the product here is a scarf. The background, hair, skin tone, earrings, t-shirt, jean, wrist-band all need to be recognised and discarded for us to even get to the relevant portion of the image!

  •  Even if we manage to select the scarf from some combination of AI and image processing, we had other problems.

New Approach

The importance and increased scale of the problem made us take a step back and have a re-look. We eventually decided that this was a difficult problem to be solved just from image processing POV and we really did not need to do it that way.

Apart from the product image itself, is there any other information that can be leveraged? As it turns out, every product in the catalog has a color property that can be used. Given a product ID, the product catalog system’s API returns the colors of the product.

For example, a shirt can have the color property as red and blue. These properties by themselves alone are not very useful, however combined with the product images it can be very useful. The colors themselves cannot be directly used in the palette because the color red in various products will be  of different different shades and the palettes need to show a shade which is in the product image.

The approach we settled on:

To start with, all the colors from the product image have equal probabilities of being the palette color of the product. Then, we get the color properties from the backend.

The probability of any color from the image being the palette color will slide up or down depending on the closeness of the color from the color property. The closer the colors are, higher the probability of it being the palette color.

What I’ll do here is list the steps along with the commands so that you can run the same commands and try the output. I’ve used imagemagick and im4java library for image processing and Google’s Catalano library for calculations.

  • Color quantise an image to reduce the number of unique colors. Before and after color quantisation an image looks like this

There are various methods of color quantization and you can use any of them. For this example I’ve used the following command:

convert nonquantized.jpeg -colors 16 -depth 8 quantized.jpeg

  • Generate a histogram of the image and find the RGB values of the top 10 colors from the image. We make two assumptions now
  • the color(s) in the palette is amongst these top 10 colors.
  • the top colors now have an equal probability of being the palette color.

The Command for this is:

convert quantized.jpeg -colors 16 -depth 8 -format “%c” histogram:info: |sort -nr  |head -n8

and the output is :

output.png

  • For the product, get the color properties of the image and convert those colors to their respective RGB values. For the above product, the color properties are grey and pink. The respective RGB values are #808080 and #ffc0cb.
  • Now we need to find the closeness of each of the top 8 colors to the each of color property. Unfortunately, calculating the euclidean distance between two RGB colors does not correlate with human-perceived distance between two colors. We need to switch to a color space this is perceptually uniform. For our case, we’ve decided to use CIELAB color space and DeltaE formula (Refer to the first answer on this SO thread for more info ). 

The below figures illustrate the difference between RGB color space vs CIE Lab color space (images)

800px-RGB_Cube_Show_lowgamma_cutout_b.png

 

682px-Color_space_CIE_1976_(LUV)+(Lab)+colorTemp.png

  • For each color in the color property, select the color from top 10 colors which has the least distance. That color has the highest probability of being the palette color.
  • Combine all the least distance color for each of the color properties and generate a palette image out of it. Output palette for the above example

  

Disadvantages of this approach:

  • Reliance on correctness of this approach : If the color properties stored in the backend are not accurate, then this solution goes for a toss.
  • Each color property is mapped to a specific RGB value, and in some cases it might need to be manually updated (peacock, navy, skin, etc). Some of them cannot be mapped to RGB color (multicolor etc).  But, this mapping is a one time process that seems good enough.

Advantages of this approach

  • It is several magnitudes faster than the manual approach.
  • The size of the palette image generated using this method is smaller than the one generated by cropping the image. This is because it does not have any color gradients and can be efficiently compressed. For this product, the left palette image is live on site and is manually cropped and the second is auto generated. The manually generated one is 529 bytes and the auto generated one is 301 bytes. This gives an average saving of almost 50% across all images.

239794316green-cherokee-youth-m-400x400-imaddgtgkgg5cjyt.jpeg 239794316green-cherokee-youth-m-40x40-imade6awthyxg3hv.jpeg TSHDBZGE4VXZSK8D_1_palette_converted.jpeg

  • The auto generated palette is guaranteed to have 1:1 aspect ratio whereas for the manually generated one it depends on the cropping, ensuring pixel-level consistency across the site.
  • It generates better palette image for products that have fine design all over the product like the below image.

Example1 :

http://www.flipkart.com/identiti-casual-sleeveless-animal-print-women-s-top/p/itmdqw5hfghx85m7?pid=TOPDQW5HCH3FXZBB

 

Example 2: http://www.flipkart.com/bhavi-striped-art-silk-sari/p/itmdtysefzedpjyp?pid=SARDTYSEGZCCYE2X&srno=b_5&ref=a3ce0f80-bae3-4e51-abad-83d38ce2ec75

                

  • It generates better palettes for multicolored products, which otherwise would have needed to crop the image at the intersection of colors which can prove tricky like below

www.flipkart.com/puma-striped-men-s-polo-neck-t-shirt/p/itmddh3gf9krfbqd?pid=TSHDCFPYCGDCVAUK

            (observe carefully to see the top ⅓rd is white)

  • Seasonal distribution of products does not impact it all. Even if there are 500K new products generated in a day, we do not have to wait for weeks for someone to sit and generate these images.

The feature is now live in production in a pilot phase  and seems to work with significant efficiency. Questions welcome!

slash n: intelligence @ scale

Let me start this blog with a note of thanks – to you, the engineer! Whether it’s an engineer in San Francisco who open sourced a framework, or an engineer in Beijing who came up with a new technique, or an engineer in Bangalore whose commits increased Hadoop’s performance by 10% – its your work that allows us to focus on solving our problems than being distracted by solving everything under the sun. Flipkart wouldn’t have been here without the support of open source tech community worldwide.

For us, slash n, our flagship technology event, is a celebration of that liberating engineering philosophy of sharing. Building India’s largest e-commerce platform, we’ve learnt a thing or two, created a few pieces of technology, and figured out what techniques/processes work and what don’t. We are committed to share our learnings and to open up our technologies and platforms to the community. slash n is a forum for us to do so and also the forum to learn from the experience of others in the industry, who share the same philosophy about technology.

On Mar 7th, we had the 2nd edition of slash n, and what a day it was! Over 500 engineers participated in the event, out of which half were external invitees. That’s more than double the number of participants we had last year. Considering it was still an invite only event, the interest, enthusiasm and participation shown by the tech community inside and outside of Flipkart was beyond expectations. Unlike last year, when we had most of the talks by Flipkart engineers, this year more than half of the talks were by external speakers from diverse set of organizations. Here are some highlights of the day:

  • The day started with Amod, head of technology at Flipkart, reinforcing the importance of sharing in the technology community, and slash n as our means of doing that. He also committed to open sourcing a few key technology elements at Flipkart over the next 2 quarters, in particular RestBus (messaging system for transactionally consistent service interactions), Aesop (change propagation system) and some of our mobile app technologies.
  • In his keynote, Sachin, founder and CEO of Flipkart, outlined the journey of eCommerce in India, the significant problems that got solved, what challenges lie ahead and how technology can address those challenges in future.
  • This was followed by two talks from very different areas – molecular biology and mobile ad network. Ramesh from Strand Life Sciences talked about how they are using advances in computing to make genetic disease discovery available at sustainable cost to everyone. Rajiv from InMobi talked about their efforts around mining large amount of user data to provide better mobile ads. It was interesting to note that uniquely identifying users across devices, apps & browsers remains the holy grail of personalization on the internet.
  • Some of the most popular talks (based on how many people added them to their schedule using slash n mobile app) included
    • Art of Promising by Yogi & Vikas from Flipkart, which pealed the layers off how Flipkart makes and keeps the availability and SLA promise to customers.
    • Soothsayer @ Flipkart by Ananda and Mohit, which talked about internals of Flipkart’s demand forecasting and inventory planning system.
    • Cataloging talk by Utkarsh and Abhishek from Flipkart, which talked about evolution of our catalog from books to 30+ M items today and how the team addressed the issues around scale, availability and agility along the way.
    • Job Scheduling in Hadoop by Joydeep from Qubole, which provided details on issues around Hadoop job scheduling as well as his experience of building Fair Scheduler and Corona Job Scheduler.
    • Participants loved the newly introduced fire talks – 15-minute quick discussion on a very focused tech topic.
    • Another highlight of the day was a panel discussion on hope, hype and reality of big data analytics, which saw healthy debate among data scientists from diverse organizations like Flipkart, IBM, Xurmo, UIDAI and Mayin, which are trying to use big data analytics to solve problems in different domains.
    • Twitter was abuzz throughout the day with participants tweeting their learnings, questions, discoveries at #slashn.

The atmosphere in the event was quite electric with interesting talks and engaging debates, which often continued between speakers and participants beyond the talk.

IISc Bangalore was the venue of slash n Keynote address by Sachin Bansal

Guest talk by Ramesh Hariharan from Strand Life Sciences Panel discussion on big data analytics

The focus for this year’s event was on ‘Intelligence @ Scale’. The theme encapsulates what we are trying to do from technology perspective at Flipkart and the effort was to share our learnings in this direction and to learn from others. We believe that large scale can become a strategic differentiator if we can use it to make the life of users better continuously. And this can happen when large amount of data generated by the user activities can be used, in real time, to make user experience better via systems that are learning continuously from each user interaction. slash n saw engineers from diverse fields like eCommerce, molecular biology, education, social sciences, mobile ad network, cloud infrastructure, etc. talk about their approaches to build ‘intelligence @ scale’ in their respective domains.

Tech bonding at scale! Full room means good engagement

The day has re-established our core belief that knowledge and technology is meant to be shared and doing so can create virtuous cycle of innovation and progress not only for us but also for the entire ecosystem. I hope everyone who participated had some key takeaways in terms of learning (and a few more connections on your favorite social network) and those who could not, can still watch the recording of all the talks on the event website http://slashn.flipkart.net/

We would like slash n to evolve into a more democratic and open platform to share knowledge and possibly to collaborate on building technologies for future. See you all at next year’s event – let’s celebrate the freedom at a grander scale and collaborate more deeply to solve problems that matter.

HostDB

This article is in two parts. The first is the announcement of HostDB, a new tool to help manage data center inventory and write applications around it. The second part of the article is a bit of a back story about why we needed this tool and all the grimy details you need to know. If you’re the impatient type, you can read the announcement and description of HostDB and skip the rest.

Announcing HostDB

Today, we’re releasing HostDB as an open source project on github. HostDB is our attempt to solve the problem of finding hosts and their purposes in a large environment. HostDB acts as a Single source of truth about all Physical and Virtual servers and is used to define their purpose. It helps us group our servers through tags and all the software written by the operations team revolves around HostDB. HostDB acts as the centralized configuration store for all sorts of information.

Hosts

Any Host that exists is created inside HostDB upon birth and has information about itself in YAML. This info can be Hardware info, amount of CPU/RAM or Network FQDN, IP address, Rack, Switch, Physical location or Function, e.g what application software the host needs? The YAML can contain just about anything, it can be varied across hosts and can be ever evolving.

Tags

Hosts are grouped together with tags, similar to a host – a tag also has information about itself in YAML This information is applied to all hosts which are members of a tag. e.g a tag called VM can be applied to all virtual machines and can be used to define properties that are shared by all VMs. To be useful a tag must have member hosts.

Versioning

HostDB provides versioning and the ability to roll back to a previous version for each and every host or tag.

API

The above concepts may look simple, but, can be used in extremely powerful ways with an API. and that’s exactly what HostDB provides. HostDB provides a REST api which can be used to create hosts, get members of a particular tag etc. We use this feature to automate just about everything at flipkart. Creating Virtual hosts, creating DNS records, automatic monitoring and escalations and building automated infrastructures. Services can do a periodic lookups to Hostdb and keep updating themselves of changes.

 

User Interfaces

HostDB provides a variety of user interfaces which can be used to interact with HostDB by applications and users.

  • Web Application
  • Rest API
  • Command Line Interface
  • HostDB::Client Perl Module

Some example use of HostDB

DC HostMap

We use HostDB to create a dynamic map of hosts in our datacenters, which is used by the siteops team to easily map the physical location of a machine inside the data center to it’s hostname or to find which physical machine a virtual machine resides in. The map is a view of the details in hostDB using the API. It changes as hosts are created/destroyed or Virtual machines move from one physical to another. This also gives visibility to everyone and devops, developers and Siteops can all talk the same language. when talking about a host.

 

DC Map
DC Map

Kloud

Kloud is both the name of Flipkart’s private cloud as well as the Virtualization software that drives it.

Kloud
Kloud

When a machine is created, Kloud creates a HostDB entry for a host. Many services, like DNS, Monitoring, Puppet, FAI have clients of HostDB that actively listen for new member addition to their specific tags, once they find a new host, they spring into action and perform tasks that are required to create the machine. e.g The puppet hostDB client adds a nodes entry for the host. The dns client creates an entry in the zone file. The monitoring client adds a nagios entry. so on and so forth.

HostDB is now available on github, so go fork it, make some changes and let us know what you think! We think it’s written in a way that will allow it to be used outside of our specific use case, and it might enable some fun projects.

 

HostDB: The Details

Sometime in 2011, While flipkart was growing at an exponential scale and the entire operations team consisted of four people. We constantly struggled to allocate hardware and to make it production ready. Cloning machines using FAI, adding monitoring, adding dns entries etc were all routine pre-defined tasks which we felt could be automated very easily. We were tracking all of this with a shared google spreadsheet which was never kept up to date. Many a times existing machines were allocated twice, or more disastrously,  re-cloned by mistake. Surely, there was a better way.

At the same time we were also thinking about a virtualization strategy, the open source options that were available at the time did not make any waves for us. So we decided to write our own, something we call Kloud. It was in these discussions that we thought about the life cycle of a machine and how instead of a centralised datastore keeping machine info, we really needed an application which could talk to other applications about the purpose of a host and it’s properties.

We looked at all the available options and were disappointed. We decided to write something which was not just a source of truth about a host, but interacted with the production environment and lived with it. Because we wrote it to automate infrastructure problems, host state and host properties comes naturally to HostDB. We kept availability, scalability and reliability as the most important features of HostDB. As It turns out HostDB scales wonderfully for thousands of clients.

Since we were such a small team, we were constantly involved in firefights and didn’t have any time to manage new services. We didn’t want to write something that depended on external services like zookeeper or mysql etc and decided to keep all the data in text files, “if it didn’t scale, we’ll change it later” was the policy.  We also wanted to future proof it and so stayed away from any complex file formats. The first prototype was written in Python by Abhishek. Both Krishnan and I refuse to read Python and Krishnan rewrote the whole thing in Perl one night. A year later another rewrite done by Jain Jonny incorporated the concept of multiple namespaces and made the code much more modular. We’ve been running this version in production for over a year now.

 

HostDB: Key/Value Store with a difference

HostDB is a key-value store, keys are grouped into namespaces based on type. ‘hosts’ is a special namespace which is central to all other namespaces. Keys inside ‘hosts’ are server names and their values contains mostly information and configuration details needed by server itself or applications it runs.

Applications can create unique keys(tags) which are application specific. You can add servers as ‘members’ of these keys which in turn helps you to consider your key(tag) as a group of hosts.

HostDB provides namespaces e.g you can create  keys(tags) that exist in a specific namespace and are access controlled, only member applications can read/write to keys of this namespace.  One can create access controls for each namespace or even for each key.

HostDB uses plain text files as its storage. The namespaces are represented as directories and keys are files inside the namespace directory. These files contain a key’s config in YAML.

The ‘members’ are also stored in files in a subdirectory in the namespace’s directory. The access permissions on the namespaces are also stored in a subdirectory.

GIT

The complete file structure of HostDB is in a git repository and git handles the versioning and transactions for HostDB. leveraging git means that we have a simple transactional store, which offers history as well as versioning. We can go to a previous version of a host or tag config at any point in time.

Web based Interface

HostDB provides a Web based interface for uses to interact. Here are some screenshots:

Command Line tool

HostDB provides a command line tool that is extremely helpful in writing those small bash one liners to get information out fast. Want to find out all machines with 24 GB of ram which are part of the search cluster. no problem!

Add an object:
$ hostdb add hosts/hostdb.ops.ch.flipkart.com
Add host to tag:
$ hostdb add -m "adding new member" tags/nm-prod/members/new.nm.flipkart.com
Get host IP: 
$ hostdb get hosts/hostdb.ops.ch.flipkart.com/Network/IP
Get tag members: 
$ hostdb get tags/nm-prod/members

 

API

There is a comprehensive list of all API functions HostDB provides. Look at the github page for details.

HostDB::Client PerI Module

We use Perl extensively and have a Perl Module that can be used by applications to interact with HostDB.  This module provides an object oriented interface over HostDB REST API.

use HostDB::Client;
my $hdb = HostDB::Client->new(\%options);
my $output = $hdb->get($id[, $revision, $raw]);
my $output = $hdb->revisions($id[, $limit]);

HostDB has been central to almost all software written by the devops at flipkart and has allowed us to scale exponentially without a fuss. We hope you find it useful too. HostDB is now available on github, so go fork it.

Hackday 4 – A Retrospective

Flipkart conducted the 4th iteration of its annual Hackday event on Sep12/13 2013 – and it was a grand success, to say the least! The event was unprecedented in terms of participation : we had 246 hackers who created a total of 96 hacks between them – and the actual number is probably higher – I personally know of several folks who were hacking away, and didn’t register or even showcase their hacks.

But more than just the numbers, it was the general buzz and excitement on the floor throughout both days, and the sheer quality of every single hack that astounded everyone present – and we’d like to share this with all of you.


The Event

mailer-360

The Flipkart Hackday is a 24-hour event where our Engineering Team is encouraged to “Think Outside the Box” and come up with innovative ideas, and then build a proof-of-concept of their idea. Post this, there is a presentation session where the best ideas are selected by a judging panel and rewarded with small prizes.

The event kicks off with a talk by Kiran Jonnalagadda, Founder of Hasgeek.com

The talk’s really entertaining and engaging, and everyone listens intently! Well, almost everyone … 

T-Shirts are distributed to all participants … 

… And there’s a mad scramble to get them!

The Organizing Team had come up with the 0x10 Commandments of Hacking …. cool stuff!

There’s plenty of awesome food … 

And Red Bull as well!

Which is consumed in copious amounts … 

And here’s the net result …. converting Red Bull into code!

There’s some serious hacking going on at this point … 

Some folks aren’t quite as serious yet =)

But the best part of the event was the camaraderie on display.

Some of the hacks are pretty hardcore … 

And some of them were completely off the beaten track, like this effort to resurrect the Flipkart Library!

The general atmosphere was so conducive to hacking, that even our guest speaker and judge Kiran got infected and started hacking away!

The sun sets, but the office is still packed with people tinkering away at their hacks.

We had made plenty of arrangements for the overnighters …

And people make best use of said arrangements, and start to get cozy!

Some folks try and catch a quick nap …

… while others have found alternative means to stay awake!

Morning glory, and people start winding up their hacks. The buzz begins to pick up again, as folks walk around and see what everyone else has been up to.

The first demo session begins, it’s complete madness as everyone tries to catch the judges’ attention!

The competition is so tight that some teams decide that some marketing is in order … 

The demo session has concluded, and it’s voting time! It’s especially hard for the judges, as their votes count for more, and there are so many awesome hacks to choose from!

Voting’s done *phew*

Everyone’s eagerly awaiting the results of the vote … 

For some people, it’s all too much!

Results are out! The top 15 hacks are shortlisted for the final presentation session.

Each of the Top 15 teams gets 5 minutes on stage to present their hacks to the assembled audience.

The audience listens intently … 

But the last 24 hours have completely drained some of our hackers!

Presentation session’s done! While the points are being tallied up, MC Sourav gets some alone time with the trophies … 

And the winners are announced! Congratulations!


The Judges

Our judging panel consisted of  :

saran-1sourav-1kiran-1

(From L-R)

Saran Chatterjee (Vice President-Products @ Flipkart.com)

Sourav Sachin (Director-Engineering @ Flipkart.com)

Kiran Jonnalagadda (Founder @ Hasgeek.com)

These gentlemen had the unenviable task of selecting the best hacks amongst a veritable sea of great hacks – all three judges mentioned just how difficult this was, because there were so many awesome hacks on display! Kiran Jonalagadda (who has organized several public Hacknights under the Hasgeek.com umbrella) told us multiple times that he has never seen a Hackathon quite like this, both in terms of size and quality – so a big Thumbs Up to everyone who participated!

While every single hack on display was top-notch, we did want to reward the ones that we felt went the extra step and had that little extra _something_ which allowed it to stand out from the crowd. The judging process was divided into 2 parts: The first part was a crowd-sourced online vote by all the participants, from which we shortlisted the top 15 hacks. These 15 hacks were then presented on-stage, where the judges scored them on criteria such as originality, impact, potential to be productionized and audience appeal.


The Winners

We had several awards to give out : Best Hack (for the hack that scored maximum on the judges scorecard), Popular Choice (for the hack that got the maximum number of votes in the online vote) and several smaller category awards such as “Most Innovative Hack”, “Geekiest Hack”, “Coolest Hack”, “Laziest Hack” and “Most Useful Hack”. List of winners is as follows :

Best Hack

Sirf Flipkart Karo | Chrome Plugin which suggest Flipkart products along with search results
Jagadish Kasi, Navni Bhojwani, Samir Bellare, Sudeep Kumar Moharana, Mayank Mittal

 

 
Popular Choice Award

ComicCon | Convert a video into a full blown comic
Jay Hasmukh Chawda, Vijayaraghavan A

 

 
Most Innovative Hack

Minority Report | Minority Report Style Analytics
Amod Malviya, Dipanjan Mukherjee

 

 

In true Hackathon style, their hack refused to work post event; and as such we had to take some creative liberties in displaying what their hack (supposedly) does =)

 
Geekiest Hack

Unix Flipkart | Bringing the goodness of the UNIX terminal to flipkart.com
Nikhil Bafna, Yogesh Dahiya, Pratyay Pandey

 

 
Coolest Hack

FaceIt | Login to flipkart using face recognition
Abhilash Goje, Aditya Punjani, Pavan Srinivas Bysani

 

 
Laziest Hack

Chota Minority Report | Smart file transfer
Vishnu H Rao, Aniruddha Gangopadhyay, Chetna Chaudhari S

 

plaeholder-teamplaceholder-hack

 

Keeping in line with the award they won, the Chota Minority Report folks were too lazy to send in photos of their team and hack

Most Useful Hack

Hackday Website | Show the Voting results on the website in realtime
Ramesh Perumalsamy, Aakash Bapna

 

 


The Feedback

Amod Malviya (Senior Vice President-Engineering; and winner of the Most Innovative Hack award)

Congratulations everyone! This was massively awesome! I was blown away by the creativity of folks. Personally, I was so torn between having to vote only 3 times that I had to upvote, downvote, upvote multiple times.

Saran Chatterjee (Vice President-Products; and one of the judges of the event)

Kudos to the organizing team plus congratulations to all hack teams . In my books you all are winners! This was my first Hackday here in Flipkart and I was really impressed with the quality of hacks. My goal now is to work with you all and to make sure I provide the support (where needed) to get some of these prioritized into the roadmap quickly . Cant wait for it to happen!

Kiran Jonnalagadda (Founder of Hasgeek.com; and one of the judges of the event)

I was pleasantly surprised by how deeply integrated the hacks were with Flipkart’s technology. At public hackathons participants almost always build something unrelated to their day job. Most of those hacks aren’t meant to be anything more than an expression of creative energy, abandoned shortly after the event.

The Flipkart hacks were different. Nearly every one was built on top of existing Flipkart tech and was meant to address a very real problem that the participants had been mulling over for a while. They had a clear sense of the solution and a deep understanding of the platform and where to plug in each piece.

I saw five broad categories of hacks:

1. At the DevOps level, (a) better logging, processing of data streams and reporting, making it possible to understand how users are using the site and how the infrastructure is keeping up, and (b) better tooling for developers to try new ideas.

2. Workflow improvements helping fix gaps in Flipkart’s existing procurement and fulfilment.

3. Front-end tweaks, giving users an immersive content-rich pathway through the site to help them make better purchase decisions.

4. Access to Flipkart data beyond the Flipkart website, allowing users to perform comparison shopping, to access Flipkart from alternate interfaces, and to take Flipkart to social networks.

5. Fun hacks, presented as workplace quality improvements, but really just developers blowing off steam.

GraceKelly: A best effort cache synchronization library for distributed systems

GracKelly is a best effort cache synchronization library for distributed systems.

In the following blog post I’ll explore, the motivations for such a library and give a brief  introduction to GraceKelly.

GraceKelly is open source and is available here: GraceKelly on Github

A Chaotic Place

The average visitor on Flipkart is not aware of the scale at which we operate. Day in and day out we handle millions of page views, thousands of transactions, millions of updates for many things including price, availability, offers, recommendations.  Under the hood of the calm, functional facade of the website, there is a complex network of services that are constantly interacting with each other.

We have services that handle everything ranging from search to product serviceability at a given pin code.

An arbitrary web request that hit’s one of our webservers can spawn a plethora of requests to a bunch of back-end services, which in turn might be dependent on other services.

The back-end services respond with different responses at a variable latency and the responses are collated, made sense of, transformed and finally a web response is created and served to the user as a web-page.

The variability of the requests and responses that traverse the complex network of services while being transformed, multiplexed, demultiplexed and altered makes for a chaotic environment.

Distributed Service Environment

Chaos means unpredictability and unpredictability is bad. When a user requests for a page his page load time must be predictable. When a product goes out of stock, the amount of time it takes to reflect on the product page needs to be predictable. We need predictability around service SLAs. Service SLAs are dependent on the load under which the service is operating. This means, we need predictability around service load as well. We can’t operate in an environment where one minute a single server is able to handle production traffic and the next minute a whole cluster is buckling under the load. So we try to grab and hold on to as much predictability as we can, where ever possible.

Caches to the rescue

Caches act as sentinels in a distributed service environment. Although their primary function is to reduce latency, when used appropriately they excel and bringing predictability to a system. This is because a cache request is extremely predictable, with almost no variability, either in response times or the load per request. This is down to the simple data access pattern for a cache request. If a given front-end request hits caches at all the back-end services we can predict with high confidence the load and response latency of the given request on each service.  One could say that there is positive co-relation between the percentage of Cache hits and the predictability of a system/environment.

Caches To The Rescue

Throwing a spanner in the works

Every time there is a cache miss both our distributed environment and it’s SLAs become a little bit more vulnerable. In the face of these risks a common pattern of cache usage seems inappropriate. One of the most common ways of updating the data stored in caches is to have an expiry ttl for every cache entry. Once this time to live expires the cache entry is removed from the cache and is no longer accessible, until another request repopulates/synchronizes the cache. Using an expiry ttl in this way exposes the underlying system to potentially harmful request pattern load for the duration of synchronization. Imagine a sequence of events like the following

  • t0 – a heavily requested cache entry c1 expires
  • t1 – there is a cache miss for c1 and a request is sent to the service to fulfill
  • t2 – the cache has been repopulated with c2

The time between t1 and t2 is the duration of exposure. During that time all requests for c1 that miss the cache are let through into the distributed environment. The predictability of the target service and all the services it depends on during this time is affected by the the per request load and the qps of all requests that result in a cache miss for c1. Caches could to be updated better than this.

Refresh don’t expire

Refreshing the cache without removing the cache entry solves the problem of exposure that cache expiry brings. In a cache refresh strategy once a value is cached, all requests for the value are served out of the cache and don’t hit the service/services at the back-end. Periodically the cache is synchronized with values from back-end services to keep the data up-to date and consistent with back-end systems. This means for all the values that are cached, the load on the back-end systems is extremely predictable. At the same time the response latencies are highly predictable for these cached values.

Many services/systems would be better served by refreshing the cache rather than expiring it. The efficacy of such a strategy depends on the kind of service in question. For services that have zero tolerance for stale data, best effort refreshing instead of expiring the cache entry doesn’t make sense. However, many services can tolerate stale data to a certain degree. For example, a stock availability service cannot accommodate stale data, while a review and rating service can still have stale data cached for a little while.

There are some popular strategies that are used to implement a refreshing cache.

  1. Global TTL, with a refreshing process: the most common way of implementing a refreshing cache is by running a separate process or thread that periodically refreshes all the entries in the cache. The shortcoming of this strategy is that, it is only appropriate where there is no variability in the staleness of data that is cached. eg: A search engine service’s cache can be refreshed once every 30 minutes if the re-indexing happens only once every 30 minutes.
  2. Fine grained TTL, with a monitoring & refreshing process: In this strategy, a separate process or thread is constantly monitoring the cache entries to see which of them have expired and refreshes them accordingly. This approach gives finer grained control on the cache refresh lifecycle for each cache entry. However, running a separate process means one more component in your environment that needs to be monitored and maintained.

What would be good to have is a cache library with regular caching semantics but one that accommodates refreshing a cache entry rather than expiring it based on ttl. This is exactly what GraceKelly is, it’s inspired by Gooogle Guava’s LoadingCache.

Cache me if you can

GraceKelly is a best effort cache synchronization library that tries it’s best to refresh any cache entry that has expired. The refresh lifecycle is solely request triggered and doesn’t monitor/maintain the cache. This means the refresh is not started when a cache entry expires but rather when the first request for an expired cache entry is made. It is best effort because if synchronization/refresh of a cache entry fails, it can fall back to the stale version of the data already present in the cache.

For every request

  • It looks up the cache and returns the value if a cache entry is present.
  • If the returned cache entry has expired it dispatches a task to refresh the cache entry.
  • If for some reason the refresh fails, it can extend the ttl of the existing entry or do nothing.

Note that a cache entry is never removed(though it can be evicted by size constraints). This enables us to

  • Shield the backend services and systems from exposure to unnecessary request load.
  • Decouple response SLAs from backend degradation and availability concerns, there by allowing for graceful degradation with stale data as fallback.

The Library

GraceKelly the library consists of a single Class Kelly that takes implementations of two different interfaces, a CacheProvider and a CacheLoader. They pass around a generic type CacheEntry.

  • Kelly: This is the core of the library that acts as a proxy to CacheProvider and is responsible for reloading the cache using the CacheLoader.
  • CacheProvider: Interface whose implementation provides the actual caching functionality. eg: a CacheProvider implementation for CouchBase, a CacheProvider wrapper around a ConcurrentHashMap.
  • CacheLoader: Interface whose implementation allows one to reload a CacheEntry based on key and value of the expiring CacheEntry.
  • CacheEntry: Generic type that contains key, value and ttl information.

GraceKelly is open source and is available here with example code and
documentation: GraceKelly on Github

Proxies for resilience and fault tolerance in distributed SOA

On-line Content and Transactions
OLTP systems are characterized by their ability to “respond immediately to user requests“. The common understanding of a transaction in OLTP is within the context of a database where data is read-written with appropriate – albeit varying; durability, integrity and consistency guarantees.
OLTP applications are quite varied and depend largely on domain and purpose. Volume and Variety characteristics of data are different as well. For example, consider these differences among a Banking application, an eCommerce web-site and a Social media portal. The OLTP classification of systems is therefore quite broad, but the basic premise remains : respond immediately to user requests.

Over the years systems have started to embrace BASE over ACID and Eventual Consistency is acceptable. Fundamental assumptions around write-time-consistency are challenged (Eric Brewer on BASE vs ACID) and Availability trumps Consistency over at scale. At Flipkart, web-site availability is treated pretty seriously as the website evolves into a platform for delivering rich, personalized and relevant content to the user.

A typical web page on Flipkart has a good mix of static content (delivered off CDN) and data sourced from a number of backend systems as shown here:
FK.com

Rendering a single such page requires about 2MB of data read/write – comprising Product information, Category tree, User session, Logging and Metrics. The data volume for a single day works out to about 30TB. Delivering this Reliably is hard due to the inescapable consequences of the CAP theorem. However, Responding Immediately to users is Do-Able if loss in Consistency & Data is statistically insignificant.

The Availability Myth
The following listing maps functionality to data stores and protocol services:
website services Evidently, different stores are used and often with good reason – few examples : Category information is structured and is stored in MySQL, User sessions are many and is sharded on Couchbase and MySQL, Search uses Apache Solr as secondary index, Notification data for a user exhibits Temporal Proximity and is stored in HBase, User Recommendations is a keyed lookup on Redis and Metrics is stored in OpenTSDB time series database.

Access to the varied data stores are implemented as SOA services with the primary objective of distribution, de-coupling and interface defined abstraction. Each service cluster has redundant nodes and provides availability guarantee of 99.9% or more.
Running a website that depends on 15 services each with 99.9 % availability, we get

99.9% ^ 15 = 98.5% uptime 
(probability of all services providing 99.9% availability at the same instance of time)

This translates to 2+ hours of downtime per week.In reality, it is generally worse. The cost of running an “always available” service or data store is prohibitively high – accounting for redundancies, backup and near real-time replication of data(strong consistency), seamless failover. Again, this may be attempted with a datastore software that supports all of this and really works!

Latency impacting Availability
Part of the availability problem lies in service invocation. Different styles of service access and its use among developers is depicted in this infographic:

Service access

Services  are often synchronous and the invocation pattern easily translates to making an API call. The API method signature is complete w.r.t data types, method name and errors/exceptions. Service clients handle errors intuitively and at times is forced by the API contract. Consequently  Most of us code to handle Exceptions/Errors, not Latency! 

Handling latencies on the other hand is more involved and requires using techniques like Callbacks and its implementation such as Java Futures. Programming is not straightforward as callbacks don’t compose well – sequencing and combining async. calls is not easy. Moreover, there aren’t many service client libraries that do this transparently.

Another often repeated practice is with regard to measurements where emphasis is on service response Mean and Median times. Variance in response times at the long tail does matter at scale – for example when 10s of servers handle millions of page view requests on a web-site. Consider the Flipkart web-site that uses PHP as the front end – each web server is configured to run a fixed maximum number of concurrent PHP processes and the number of servers is sized by expected load on the website. Consequently, this means resources like CPU, Memory and Processes/Threads are limited/shared and each web-page is served by borrowing, using and immediately returning the shared resource to the pool. Each unit of work is expected to be short-lived and execute in a timely manner. Latency build up – however small and in only a subset of services can impact availability and user experience as shown here:

latency affecting availability

Fault Tolerance – Fail Fast, Recover Quickly
The engineering team at Flipkart built resilience into the website technology stack by having it deal with imminent failures in upstream services. The fk-w3-agent aka W3-agent daemon was already being used successfully to scale PHP and get around some of its limitations (See slide no. 77 onwards in this presentation : How Flipkart scales PHP). A detailed presentation on the evolution of the Flipkart web-site architecture is available here : Flipkart architecture : Mistakes & Leanings.
The W3-agent was redesigned to be a high performance RPC system that could serve as a transparent service proxy. Few design principles for this new system were:

  • Prevent cascading failures – Fail fast and Recover quickly
  • Provide Reasonable fallbacks around failures – the exact behavior can be service specific
  • Support for multiple protocols and codecs in order to enable transparent proxying – Unix Domain Sockets, TCP/IP and Http, Thrift
  • High performance runtime with low overhead – ability for a single local instance to handle hundreds of millions of API/Service calls per day

The fail fast and fallback behavior is entirely functional and implemented as alternate path logic by the respective service owner. The invocation of primary vs alternate path flow is at the discretion of the service proxy.

The Flipkart Phantom

Proxy servers & processes are used extensively as intermediaries for requests from clients seeking resources from other servers. There are different types of proxies and one specific type – the Reverse Proxy can hide the existence of origin servers, where requests from clients and  responses from servers are relayed back-and-forth in a transparent manner. The proxy also offers a runtime for implementing routing or highly localized business logic – for example executing a custom expression to sort data elements returned by the service response.

Phantom is a high performance proxy for accessing distributed services. It is an RPC system with support for different transports and protocols. Phantom is inspired by Twitter Finagle and builds on the capabilities of technologies like Netty, Unix Domain Sockets, Netflix Hystrix and Trooper (Spring).

This design diagram depicts logical layering of the Phantom tech stack and technologies used:
Phantom tech stack
The layer abstraction in the design helps to:

  • Support incoming requests using a number of protocols and transports. New ones (say UDP) may be added as needed. Mixing different incoming(e.g Http) and outgoing (e.g. Thrift) transports are also supported.
  • Create protocol specific codecs – e.g. Http, Thrift. Adding a new Thrift proxy end-point requires only configuration edits, no code change needed.
  • Automatic wrapping of API calls with Hystrix commands with reasonable defaults for Thread/Semaphore isolation and Thread pools. Users of Phantom are not required to program to the Hystrix API and focus on implementing service calls and fallback behavior. Fallback behavior is influenced by configured parameters(timeouts, thread pool size) and real time statistics comprising latent requests, thread-pool rejections and failure counts (see Hystrix’s support for this : How Hystrix works)
  • Define an API layer for calling services. This is optional and promotes request-response data driven interfaces.

Phantom is open source and available here : Phantom on Github

Phantom proxies have been used to serve hundreds of millions of API calls in production deployments at Flipkart. More than 1 billion Thread/Semaphore isolated API and service calls are executed on Phantom everyday. The proxy processes were monitored and found to incur a marginal increase in Resource utilization while response times remained same at various percentiles measured.
Phantom deployment

Responding immediately to user requests – redefining the user experience
Proxies like Phantom provide the technical infrastructure for shielding an application from latencies in upstream services in a distributed SOA. The proxies are transparent to service clients & services and therefore non-intrusive. Fallback behavior for each service however, needs to be implemented by service owners. Also, recovering from failed transactions (if required at all) is outside the scope of Phantom. For example, email campaign hits are stored in a database and the fallback behavior in case of database failure is to append this data to logs. Recovery of data from logs and appending to the database is an operational activity implemented outside Phantom. Another example is displaying product information where Phantom fails over to a local cache cluster if the Product Catalog Management System is down. This behavior can result in issues related to consistency – price changes & stock availability changes may not reflect. The application i.e web-site and the end business processes (fulfillment of orders placed based on cache data) will need to change to redefine the user experience.

Choosing a Datastore For The Flipkart User Engagement Platform

A User Engagement Platform

Tersely defined, a “User Engagement Platform” is a platform that solicits, accepts and displays user content and actions. These collections of content or actions are usually associated with both user and non user entities.

Such a platform enables engagement paradigms like reviews, user lists, ratings, votes, comments, karma… etc. It’s easy to imagine such a system being used by millions of users, who are constantly submitting and accessing varied and rich content.

This access pattern implies near real time updation of stats, ratings, votes, likes, plays, views… etc at scale. This usage creates loads of data, an abundance of which is volatile and ephemeral in nature.

In short, imagine a customer engagement platform for Flipkart, with about a million unique visitors per day, most of whom are actively engaging with the site and the community. We envision our user engagement platform handling such a load pattern with ease and ready to scale. One of the most critical, operational components in such a system is the data persistence layer. Ideally, this persistence layer should be highly performant, horizontally scalable with minimal overhead, consistent, always available and partition tolerant. But…..

CAP: Or why you can’t have your cake and eat it too

The CAP theorem, or Brewers theorem of distributed computing systems, states that it is impossible for a distributed system to simultaneously provide all three guarantees of

  1. Consistency: All nodes see the same data at the same time
  2. Availability: a guarantee that every request receives a response about whether it was successful or failed
  3. Partition tolerance: the system continues to operate despite arbitrary message loss or failure of part of the system.

The implications of the CAP theorem are crucial for selecting the persistence layer of an application or service. One has to carefully analyze their respective app/service requirements and pick the appropriate guarantees from CAP that their prospective persistence layer will support.

Data access patterns for User engagement services

Historically, the data access pattern for user generated content was low on writes and heavy on reads. However, modern social engagement patterns around user content have modified this pattern to being write heavy as well, to accommodate more likes, more ratings, more upvotes, more comments, more shares. So, today’s and tomorrow’s engagement services should accommodate, heavy write loads, heavy read loads, heavy aggregate(counter), modify and read loads. What becomes apparent if we look at user engagement services in this way is that aggregation needs to be a first class function of engagement services that is near real time, scalable and highly available.

Eventual consistency and User experience

At the same time we can also note that most of today’s engagement heavy applications tradeoff consistency for eventual consistency to achieve better scalability through horizontal partitioning and availability. The extent to which a congruent user experience can be pulled off with eventual consistency differs greatly. Reddit’s upvote user experience is a good example of using eventual consistency without it adversely affecting how the user perceives consistency on the platform.

Youtube’s “301 Views” for a video with “20000 likes” falls at the other end of the spectrum of good user experience using eventual consistency. So, with careful application and service design, effective tradeoffs on data consistency can be made without affecting the user experience. By doing this we immediately free ourselves from the “C” constraint of CAP, which leaves us free to explore the “Availability and Partition tolerance” guarantees which are very much desired in this context. The following section gives a brief example of the kinds of use cases that our engagement platform should support and what they imply for the persistence layer.

A playlist of the people

Imagine a community created playlist on Flipkart’s Flyte. This is a playlist where people add songs to a global playlist and then upvote or downvote songs added by other users. The users should have an always on experience and neither their submissions nor votes should be missed. The implication here is that there shouldn’t be a single point of failure in the write path. Hundreds/thousands of users could be simultaneously upvoting/downvoting the same song so locking of counters should be avoided and Distributed Counters should be preferred. Not every user can see the same view of the data as the nature of the data is very transient to begin with, so eventual consistency should do fine. Given the massive amount of user engagement, the sort order of the playlist is going to change very often so one should avoid query time sorting and prefer data that is natively sorted. Adequate clarity around such usage scenarios enabled us to confidently transition from requirements assessment to technology selection.

Technology Selection: What we want from a persistence layer

Let us consider the attributes of an appropriate persistence layer for engagement services.

  • Availability and Partition tolerance, with tunable consistency: as discussed above we should be able to trade off on consistency to accommodate highly available and partition tolerant engagement services.
  • Linear Scalability: In the face of massive amounts of content being created by  users the system should be able to scale without degrading performance.
  • Operational Efficiency: The operational requirements of the highly available, distributed, partition tolerant persistence layer should be minimal.
  • Community Support: There should be a thriving community of users and developers that is both effective and helpful.
  • Parsable Code Base: The code base should be gorkable both in size and complexity, with either good documentation or help from the community.
  • Professional Support: It is preferable to have companies that are providing professional support for the platform.

Though this isn’t an exhaustive list, it’s a good starting point to explore different alternatives. However, there are also functional requirements of the persistence layer that must be considered.

  • Aggregators are a first class concern: aggregator modification is potentially the most heavy write load element of an engagement service. So the persistence layer should support highly performant aggregator modification over a distributed infrastructure.
  • Sorting is a first class concern: Most user generated content is going to be in a sorted form, ranging from most recent comments(like news feed) to sort by helpfulness. Sorting large amounts of data should be handled in a highly efficient manner.
  • Multiple pivot points for data elements: each complex data entity should be accessible through its attributes through a reverse index or filtering.
  • Offline stats with map reduce: the persistence layer should support map reduce on data natively or should be able to easily be plugged into a map reduce framework like hadoop.
  • Search integration: text search should either be native or easily pluggable into the persistence layer.
  • Selectable/Updatable individual attributes: Attributes of a data entity should be individually selectable and updatable.
  • Schema Less: The data model should be flexible and should not impose any constraints on what and how much data is stored. A schema less data store like columnar or key-value data stores provide great data model flexibility.
  • Native support for ephemeral data: engagement services are going to generate lots of data of an ephemeral nature, i.e data that is important/valid only for a short period of time. Ephemeral data should not clog up the system.
  • Replication should be first class concern: replication, replication awareness should be deeply integrated into the design and implementation of the database.

Cassandra

Considering all the above factors we ruled out traditional RDBMSs and ventured into the wild west of databases aka “NoSQL”. After evaluating and eliminating a bunch databases (Document stores, key value stores, Riak – attributes not selectable/updatable, HBASE – catered to consistency and partitioning….) we arrived at Cassandra. Cassandra purportedly supports many of the above mentioned functional and non-functional requirements.

The Good

  • Online load balancing and cluster growth: Cassandra is designed from the ground up to be deployed on a cluster. Growing and shrinking clusters is transparent and can deal gracefully with node loss in a cluster.
  • Flexible Schema: Cassandra is a column oriented database and there is inherent flexibility in the schema for the data.
  • Key Oriented Queries: All queries are key,value oriented making the database highly performant if appropriately configured.
  • CAP aware: Cassandra is CAP aware, and allows one to make tradeoffs between consistency and latency (Consistency and Partition tolerance). Consistency can be configured for different levels. One can also tune consistency at a per query level.
  • No SPF: No single point of failure. Cassandra can be configured to be incredibly resilient in the face of massive node loss in the cluster. This is because replication is a first class function of Cassandra.
  • DC and rackaware: Cassandra was built to be datacenter and rack aware and can be configured appropriately to use different strategies to achieve either better DR(disaster recovery) resiliency or minimal latency.
  • Row and Key Caching: row level and key level caching is baked into Cassandra. This makes Cassandra a viable persistent cache.

The Bad

  • Limited Adhoc Querying Capability: due to the absence of a query language as comprehensive as SQL, adhoc querying are severely limited on a Cassandra cluster.
  • Tight coupling of data model: the way in which Cassandra data models are created are heavily dependent on their access patterns from the application layer. In contrast RDBMS systems model data based on entities and their relationships. Hence, the Cassandra data model is tightly coupled with application access patterns. This means any app level features, changes will impact the data model to a large degree.
  • No Codebase stability: Cassandra is still rapidly evolving and the codebase and feature set change constantly.
  • Bad documentation: the documentation is very sparse and outdated/defunct due to the rapid pace of change.
  • Lack of transactions: Cassandra does not have a mechanism for rolling back writes. Since it values AP out of CAP there is no built in transaction support.

The Ugly

  • Steep learning curve: the steep learning curve combined with having to sift through the internet to find appropriate documentation makes Cassandra very hard to get a handle on.
  • No Operational experience: Zero or low operational experience across the organization with Cassandra when compared with RDBMS systems.
  • Here be dragons: One of the scary prospects of going the Cassandra way is fear of the unknown. If something goes wrong, how will we be able to debug, do an RCA in a timely manner without any tooling/experience and fix the issue.

Appendix I

Cassandra Riak HBase
CAP AP AP CP
Language Java Erlang, C, Javascript Java
Type Column oriented Key-Value Column Oriented
Protocol Thrift / custom REST/Custom REST/Thrift
Tuneable tradeoffs
for distribution and
replication
Yes (N,R,W) Yes (N,R,W) No
Map/Reduce Through Hadoop Built In Through Hadoop
Distributed Counters Yes No Yes
Built in Sorting Yes No (map reduce) Yes

The Flipkart Technology Blog