Friday, July 01, 2016

Redshift Cloud-based data warehousing solution


Redshift is implemented using the ParAccel Analytical Database or PADB (ref). From ParAccel definition:

PADB is architected as a columnar, compressed and massively parallel relational database management system that is capable of all-in-memory, if desired.

The main component is the Leader Node (interface to the outside world) whose responsibilities are to manage communication between nodes, to control session, to parse and optimize queries and to schedule execution of workload (excluding participation to any data operations).

The second component are the Compute Nodes responsible in processing, storing data and sharing workload evenly among themselves. These nodes receive query compiled code from Leader node (based on the execution plan) and send back result after execution is finished.

Each node work with its own disks against its own memory and CPU, but may depend on each other when intermediate result sets are transferred during join/aggregations operation. To increase parallelization (through multi-core capability) these nodes are furthermore partitioned into slice, with each having a dedicated portion of memory and disk space, where queries and database processing are finally executed.

The other component is the Communication Fabric using custom Gigabit Ethernet and allowing for communication between Compute and Leader to be high-bandwidth, private, close proximity and very high speed. The Compute Nodes are located on an isolated network not accessible to client applications.

SAN backend is not mandatory but useful for backup, snapshot and restore when in High Availability (HA) mode. In fact, Redshift is actually optimized to work with fast direct-attached storage (DAS) as typical or Cloud-based cluster based on commodity hardware.

Source: Amazon Redshift Database Developer Guide

Key features

Redshift main selling points are

  1. Shared-nothing MPP high performance DB not requiring to build redundant data access structure. No need to create and maintain redundant structures like indexes, materialized view, physical aggregations and summary tables, ..;
  2. Suited to a broad range of schema model. No need to denormalize or build dimensional data model for improved performance;
  3. Columnar storage offering high compression ratio and efficient utilization of I/O. Only data block needed are read, as opposed to row-oriented storage DB (BI application's queries usually involve a fraction of available attributes and thus penalized row-oriented DB where all attributes are read for each block I/O).
  4. Leverage low-cost commodity computer with DAS storage (either mechanical HDD or onboard flash SSD) common to all Cloud-based environments

Schema model

The Redshift/PADB is supposedly schema agnostic, in the sense that you don't need to favor one modeling technique over another. Let me comment on that a bit.

Data warehouse implementation using RDBMS uses a variety of techniques built solely to improve performance. We add physical-based structure such as indexes and Materialized Views, also schema-based structure like denormalized Dimensional models, and potential external one using complete different physical schema (like MOLAP). All these add up both in complexity and storage requirement. Redshift moves away from this trend by only storing the needed data (typically modeled in 3rd NF) without redundancy.

The most performance gain from Redshift is coming mainly from 1) columnar orientation and 2) MPP features. First point discourages the use of highly normalized models. These models tend to have narrow tables (with few attributes) and consequently will not gain much I/O performance (for each block read) compared to row-oriented databases. Furthermore the MPP features induces locality sensitivity during join and aggregation operations. Joining rows locally is best, but performance will degrade quickly once these rows get distributed across different nodes. This again discourages highly normalized model, especially for relationships/association modeled as m-to-n structures where rows are distributed using only one foreign key.

Personal recommendation: I would favor some level of de-normalization of the data model (especially if your model was designed for integration purposes) but without going to the extreme of defining only a few wide denormalized dimensions.

Redshift Construct

There are numerous constructs in Redshift not commonly found in typical relational DB. These need to be considered during physical data modeling and are determinant for performance aspects.

Data Distribution style

As with any cluster-based MPP database, Redshift scalability is obtained by distributing data among cluster nodes. The goal is to distribute workload uniformly among all nodes and to have data collocated (data from different tables participating in joins and aggregate stored on same computer node).

Rows can be distributed in different ways:

1. EVEN Distribution: rows are distributed evenly in a round-robin fashion.
2. KEY Distribution: rows are distributed according to a specified key (known as sharding)
3. ALL Distribution: rows are simply replicated on all nodes

During query execution, the goal is to minimize the (costly) redistribution of rows needed to perform joins and aggregation. So KEY distribution for two larges tables frequently joined together should be defined with their FK/PK pairs. Smaller and rarely updated table will benefit greatly from ALL distribution without much impact on storage space. Volatility is critical as updating tables to all computer nodes can be costly. EVEN distribution is the default style used when no clear a-priori information can dictate which distribution to use.

Column Compression

The columnar storage format implies that data belonging to same column are stored sequentially on disk. This offers good compression potential (column-data has same format and often similar in content) and consequently, better read performance as more data (RS store data in 1Mb disk block) is read per I/O.

The appropriate compression encoding must be chosen for each data column. Besides the basic raw encoding that simply stores data in raw format (no compression), one can choose between:
  1. Byte-dictionary encoding (BYTEDICT)
    • Use a BYTE dictionary where each value is mapped to (up to 256) unique value
    • Applicable to all types except BOOLEAN
    • Ideal for long characters type having less than 256 values (per block)
  2. Delta encoding (DELTA, DELTA32)
    • Store only the delta difference from preceding row
    • Applicable to types with natural ordering (INT, DATE, TIMESTAMP,..)
    • Ideal for datetime or sequential number columns as delta between rows can be represented more compactly
  3. LZO encoding (LZO)
    • Compress data using LZO scheme frequently used with text
    • Applicable to all character types (CHAR, VARCHAR,..)
    • Ideal for long free form text
  4. Mostly encoding (MOSTLY8, MOSTLY16, MOSTLY32)
    • Compress "small size" values while leaving larger one in raw format
    • Applicable to number types (INT, BIGINT, DECIMAL,..)
    • Ideal when data type used is larger than most values stored (only a small fraction needs the extra space)
  5. Run-length encoding (RUNLENGTH)
    • Store values as token plus a count giving the number of consecutive repetition
    • Applicable to all types
    • Ideal when data is often repeated in sequential rows
  6. Text encoding (TEXT255, TEXT32K)
    • Store a separate dictionary of unique words (up to 245 unique values) and store other words uncompressed
    • Applicable to VARCHAR types
    • Ideal when data contains the same words frequently

Sort key

Redshift can define one or more columns per table as sort keys. Rows will be sorted during initial load accordingly. This is especially useful during read where data block can be skipped entirely to minimize I/O, typical with range-restricted predicate (where population between 1000 and 2000). This is possible for every column as Redshift keep track of minimum/maximum values for each data block.

Sort keys can be either 1) Compound or 2) Interleaved. The first is useful when we use the same prefix keys as predicate (or a subset of), i.e. if index: K1, K2, K3 then where K1 = .. and K2 = .. is ok, but not where K2 = ... The second one is used when any combination of keys are found as predicate, without worrying about which one is used.

You define sort keys based on the query that frequently same columns as equality/range predicate. For example, if recent rows are more frequently accessed, then you'd sort on the Timestamp column. If some tables are very frequently joined together, you can even define their Sort key to be also their Distribution key and let the query optimizer use sort merge join (very fast as sorting is skipped) instead of slower hash join.

Best Practices

From the Redshift developer guide, there are a few important practices to keep in mind:
  • Make sure you define a Distribution style for each table, unless the default one EVEN is satisfactory
  • Make sure you define a good Sort key
  • Let COPY command decide on the best Compression encodings
  • Define data relationship integrity constraint (will not be enforced but still help query optimizer)
  • Use smallest format size for data type (temporary table created during long queries uses uncompress data with format defined in DDL)


Note: I’m now using Markdown for all my notes, so I can convert any part of them into html using a conversion tool such as text-to-HTML tool. Writing blog post this way is such a big time saver!

Tuesday, May 17, 2016

DataPlatform Fanchising

I have recently changed (again) career path.  My last job involved quite long commute.. the daily 3.5hours spent on train, bus, metro and tram was starting to get its toll on me!  So I decided this was actually a good opportunity to move on to something else! 

What about looking at Cloud computing applied to Business Intelligence (BI) and Data warehousing (DW)?  DW in the cloud, in contrast to on­-premise, is tailored for organisations not willing (or no longer) to run expensive, risky and over-­budget DW platforms on­-premise. It mitigates risk and lower expenses by offering p​ay-per­use ​cost model common to cloud computing and SaaS.

Looking for ways to experiment on various Cloud DW solution, I'm currently working on a project that I coined DataPlatform Franchising.  

This project capitalizes on 1) my experience acquired during 15 years doing BI projects in the industry, combined with 2) the maturity level recently reached by Data Warehousing/Analytical Cloud providers (or DW in the Cloud).

And I believe it could also lead to the emergence of DataPlatform Franchising, a logical step for Cloud computing and Analytics. For those interested, I'll be describing and commenting on this experimentation at dataPFranc.


Monday, March 14, 2016

Cloud vs data warehousing

Things are going quickly when software moves to the Cloud. For the software vendor, there is no more need to maintain x number of code versions to fit every one's preference in hardware. And also the adoption rate can go very fast as the entry cost is pretty much nothing for any new client interested in trying out the offsite platform.

I'm currently spending effort to get up to speed with Cloud-provided DW solution. They could be disruptive to a conservative industry like DW. Here are a few factors I see as impacting the adoption rate.

Cloud computing trend

In recent years, the trend of cloud computing has increased both in scope and popularity to reach nearly ubiquity. It started off with simple needs like storage space, evolved into more advanced offers like computing resource letting user run their own software (PaaS) and has recently moved up the abstraction ladder with complete solution and service stacks being offered (SaaS).

In the realm of Business Intelligence and data warehousing, this trend did not pick up so rapidly (political and security worries) but things are quickly catching up with recent offering like RedShift (Amazon AWS), SQL Data Warehouse (Azure Microsoft) and Elastic Data Warehouse Service (Snowflake). There are many reasons we believe this will grow a lot more, but for the simplest and quickest answer: cost! Doing Business Intelligence and data warehousing projects with on-premise platform is an expensive adventure! (link)

Data Deluge and new Tools

Business intelligence domain has also increased in complexity recently following the advent of what has been called the data deluge. The appearance of a multitude of new data sources from social media, connected device, Internet of Thing, has challenged the relevance and adaptability of traditional BI solutions. BI tools and platforms were designed mostly around data generated from operational system, where data type (structured) and volume were still manageable.

Besides having created its own set of new buzzwords like Big Data, NoSQL, Data Science and Predictive Analytics, current trend has clearly been disruptive in terms of data type (semi-structured and unstructured) and data volumetry/velocity.

The industry and open community were quick to devise new tools borrowing ideas from Google’s scalability architecture (commodity hardware and map reduce paradigm) that focused mostly in handling the sheer data volume and velocity or burst rate. These tools have been loosely defined and grouped under the category NoSQL. No matter how good these tools are at handling the 3V of big data, they all fall short in meeting the most important aspect of any BI analytics solution: the Data integration bit!

The introduction of these new these tools increases significantly BI environment complexity and thus requires more than ever formal and proper architecture blueprint and principles. Otherwise your entire BI platform integrity is jeopardised and will cause on-premise TCO to increase even more. The solution to that is to rely on Cloud solution provider to provide with solid architecture expertise and to offer new cost model with no initial cost investment.

Data Integration & architecture (more relevant than ever)

Anyone today is seeking to extract useful information from raw data in order to gain knowledge and make better and informed decision (i.e. data-driven decision). No matter how valuable your data asset is, there will be many shortcomings if it is kept in its raw format.

Raw data is inconsistent, incomplete, plagued with error, unconstrained, unformatted, outdated, etc. Modeling data imply applying some rules/checks, standardization and harmonization throughout different sources. Data integration implies doing quality assessment and data correction. Hence only data integration and modeling can lead us to “desired” qualities of data, i.e. the five C’s of Data (see Rick Sherman BI guidebook):
  1. Clean (dirty, missing data treated) 
  2. Consistent (knowing which data version is right) 
  3. Conformed (enterprise-wide common and comparable data) 
  4. Current (data latency adapted to context) 
  5. Comprehensive (breaking departmental or section sillos) 

It is real hard work, time-consuming and requires a lot of thinking… but there is no alternative. Don’t believe in any magic solution or silver-bullet tool that promise to transform raw data into actionable information!

We are faced with a multi-source data integration problem, and the new source of data deluge should be integrated and complementing the existing and traditional BI data layers in some form or another (consolidated/aggregated), and not merely bury us with massive and unwieldy data, i.e. the deluge!

Transforming your raw data into knowledge (the "golden" path: raw data -> information -> knowledge) is conditioned on preliminary work that focus on analysing and modeling data. This is especially relevant with semi-structured data found in BigData where no explicit model and constraint exist. There are usually only implicit rules (schema-less) making raw data much harder to work with.

How can Cloud-based solution help you on that...along with your Cloud solution provider, you can find a highly skilled and specialised team dedicated in data integration and the particular Cloud technology. Then you can avoid having to find and train your own team for that purpose. Everyone should focus on its own core competency and avoid building/duplicating specialised skills ..


Wednesday, February 24, 2016

Neo4J, the Graph database

At work, we are confronted with the implementation of a database involving rich and densely interconnected data (.. more relationships than entities) and involving complex recursive query to run.  We thought it could be a good opportunity to test out Graph database.

In a nutshell, Graph database are a good candidate with:
  • highly connected dataset
  • data access through a multi-steps traversals (finding stuff in a network/hierarchy of nodes)
  • data access that is both flexible and unknown (path is changed according to nodes encounter in the way)
  • ultimate flexibility with thousands of classes (node type) having different properties with new classes/properties defined everyday!
We decided to go with Neo4J which seems to be the most stable implementation having the longest track record.  The following are notes that are relevant to Neo4J but some general advice are probably applicable to other implementations. 

I've taken some notes that I share here... (I've also worked more on the Cypher language and will try to post my Python notebook experimentation later).

1) Nodes

These are our entities characterised by attributes, called properties.  It is schema-less, so entity may have different properties. Things to avoid:

  • Nodes with rich properties.  Ex. Country node type with language spoken, currency, etc..  (it's a smell indicating these properties should probably correspond to Nodes) 
  • Nodes should be fine-grained, so avoid nodes representing multiple concept.
2) Relationship

The relationship are for access.  Access using global index is not typical, we do traversing instead using relationship are the primary means for nodes access.
  • Properties on relationship that represent entity (frequent with relationship like "WorkFor", "GrantedTo"..)
  • Used to access neighbouring nodes starting from one nodes and traverse many using the Relationship type and properties.
3) Model

The data model is fun and easy to do simply with whiteboard drawing: you model by Examples and not by Class! So quite suitable to do with business and used a communication tool.
The model is typically normalised and any properties appearing in multiple places should probably be nodes.

4) Anti-Patterns

- Avoid hefty nodes: User nodes with many properties like names, email, address, job, ... a lot of times, these correspond to nodes.

- missing nodes: John emailed Tim, make it 3 nodes with email a node also, and not a relationship between node John and Tim.

- Avoid hot nodes: nodes having a very large number of incoming relationships.

5) Graph Querying

2 approaches are possible for querying a Graph DB:
  1. Graph traversing
    • Start off with some nodes (the roots)
    • Spread out from these according to given path
    • Yield some side-effect in the process (intermediate computation, ..)
  2. Graph pattern matching 
    • Provide an abstract (sub)graph with any number of defined instances
    • Graph engine determines all subgraphs of the larger one fitting the pattern

The first category (imperative language) is used by generic RDF language like SPARQL and Gremlin (this one can support the two). The second one (declarative language like SQL) and is used by language like Cypher.

6) Loading Data 

There are a few ways we can load data into Neo4J: 1) transactional (live update through its REST-API), 2) batch (for high volume like initial load), 3) Cypher (similar to suing DML commands in SQL), or 4) through some API based on Java or other language.

Various tools can also be used for data loading, whether they are XLS-based, Neo4J-shell based, CLI-based (command line programs like batch importer), Neo4J browser application, or ETL-based.


Wednesday, July 08, 2015

Anchor model + Data vault

I already discussed about data modelling techniques promoted by Data Vault (DV) and Anchor Modelling (AM) approaches.   Now what if I were to borrow ideas from both techniques in order to come up with a different model.  This post describes exactly this,  with the aim to improve on somewhat contradicting criteria:  flexibility and coherence!

Note that this proposal is subjective and based on my preference and experience in data warehousing architecture.  At the end, the resulting data model could go either way as it would yield very close solution.  The fact that both have reached similar modelling techniques while originating from independent and different background is a testimony of their sound principles.

This post uses terms from both DV and AM to name data model primitives according to the relevant context.  For anyone not familiar with both context, here is the name DV vs AM terms matching:

Hub       =  Anchor   
Satellite =  Attribute
Link      =  Tie      
Ref       =  Knot     

1. Naming convention

For naming, I'd adopt the easier and more friendly DV style.  Although AM proposed a completely formalised and rigid convention, I find expressiveness and clarity to be important virtues when it comes to naming things.   AM naming convention is good with objectives like full automation and avoiding collision, but it turns out that the use of mnemonic and descriptive text result in long and cryptic column and table names.   DV style is quite soft in this respect with a simple annotation (suffix or prefix) of table based on their primitive type.

Preference:   DV (favour simplicity over rigidity)

2. Audit / Metadata information

In this respect, I'd rather use AM approach that cleanly separate all ETL-related metadata from real data.  So each data element have a FK referring to its metadata stored in separate table (for things like data source, load timestamp, job details, ...).   Mixing metadata & data blurs the line between the two, and could mislead users to confuse temporal validity of attribute  (when a new value became valid vs when did we learn about this new value).

Preference:   AM (get the metadata out of the model)

3. Reference data 

Reference data in DV seems to be more of an after-thought than a valid 4th model type.   Ok this is a subjective opinion, but in AM Knots are official primitive type that even impact the name of the underlying primitive (ex. a knotted-attribute, a knotted-tie? ).   The way I see it, there is a good motivation to account for low/fix cardinality and immutable descriptive reference data.  

Preference:  AM (make reference data first-class citizen)

4. Temporal Relationship 

Links in DV are multi-way relationship identifiable through the combination of all referred Hubs. These are recorded as unique tuple of FK's with the addition of artificial surrogate-key acting as the PK.  In contrast, AM's relationship (Tie) is uniquely identified by some set of Anchors, but in addition, we can have optional non-identifiable Hub (or Knot) for which we need to keep historical changes.  As eluded in previous post, this adds the advantage of supporting temporal relationship directly in Tie, however, with the cost that Tie can no longer support over-hanging Attributes.

I'd argue for increased flexibility: let's support both styles and apply the most suited one for our use-case.   For ex, let's apply DV style when we need to keep track of attributes inherent of multi-way relationship.  In other words, when there is no single Hub that uniquely identify the relationship, avoid creating an artificial Anchor just to support Attributes as typically done in AM.   In contrast, let's leverage inherent temporal features of AM style when relationships go through various mutable state that we need to keep track of, or when the relationship has a unique Hub identifier.

Preference:  DV-AM (allow for more flexibility with Relationship)

5. Time Point or Time Segment 

When we need to keep track of Attributes changes, AM 6th NF dictates no data redundancy such that only a single time point is stored.   In DV, the full time segment is stored which requires an "update" operation during loading to close preceding value.   Advantage of AM is the more elegant and optimised "insert-only" strategy, while DV favours the cleaner and easier query logic and execution.   Here, I 'd say either approaches because it is mostly based on the physical DB engine capability.  I then make the DV time segment not mandatory but optional:  only due to constraint originating from your DB engine implementation.

(I like this idea that every batch load will only bring new values, but ...)

PreferenceAM (use a single time point if you can afford it)

6. Business Key  

In DV, Business key (aka natural key) are considered immutable and stored along the Hub data.  No such assumption is done with AM, and arguably this offers better longterm flexibility (see my previous post).  Here I definitely adopt the AM style and use a dedicated Natural-key stored in an external Attribute table, as it accounts for more situation down the road....  alternate nat-key replacement, nat-key temporal mutability, etc..

There are also cases for weak-entity (i.e. entities having PK coming from other Anchor) ... also discussed about nat-key View and .

PreferenceAM (most things are mutable, so avoid immutability assumption for business key)

7. Satellites or Attributes normalised form   

In AM, 6th NF imposes a very strict rule: 1 Attribute = 1 Table.  I clearly see the advantage of this for temporal-type attributes.  However, I'd rather use the more flexible DV approach where the rule is more relax here...

Aside from the table explosion issues, I see two arguments in favour of regrouping some attibutes à la DV style:

  • Some Attributes are not meant to be historised.  (for ex, most transactions are atomic and immutable, so that change in state can only happen in a subsequent or corrective transaction)
  • Some Attributes are purely functionally dependent on other attribute (ex. a code has often a descriptive tag attached to it) 
PreferenceDV (favour flexibility and allow for one or more attributes) 


Friday, June 05, 2015

Spark data processing/analytics platform

At work we are looking to leverage a large scale data processing/analytics platform called Spark.   Before doing some hands-on work, I will always do some research to help me get started and have better insights and context on a large scale platform.   So this post summarises these notes on one of the hottest techno on Big-Data which officially past the-peak-of-inflated-expectation!  

Spark is a Data analytic/processing Platform optimised for iterative algorithm and interactive data mining and batched analytics.    

Spark provides a unified platform leveraging MapReduce (MR) parallel data processing capabilities while avoiding some limitations associated to the ancestor open source platform Hadoop.   Spark came out of functional programming model like MR, however it is also a broader generalisation of that model as more than just MR-type tasks are supported.

Underneath Spark's platform architecture lies what is called Spark execution engine.  On top of that you have extra modules like Shark for SQL capability, Spark streaming, MLib for machine learning, GraphX for Graph-based computation, and language-specific libraries (3 supported: scala, java, and python).   See here for an example of a platform integration stack of one vendor.

Also a key component is its Data storage abtraction model called Resilient Distributed Data (RDD) which was designed for coarsed-grained data transformations operating over the majority of data elements.  Consequently it is not suited to operate asynchronously over fine-grained share data, typical of RDBMS.  

Core Primitives


RDD can be qualified as the cornerstone of Spark data management.   In essence, these are data abstraction for immutable and fault-tolerant collections of elements built explicitly for parallelism done on a cluster.   Immutability provides support to have a more efficient fault-tolerant system critical in distributed environment.

Data is processed by chaining succession of RDD's transformation each performing bulk writes on a new RDD, each of which can be easily recovered by reapplying the same transformation definition in case of node failure or straggle (slow node).   These lead to an important property called lineage obtained directly from the resulting directed acyclic graph (DAG) of all transformation definition.  

From this DAG lineage property, it follows that any RDD could be easily re-created after a node failure by regenerating source RDDs.

Simple DAG showing RDD lineage

RDD persistence is controllable, so it is up to the user to decide which steps should be cached when we may need future re-use and/or better performance.

Cached RDD are stored as Java objects for fast access within JVM, but can also be stored on disk in serialised form (could have a smaller footprint than the Java object) in case where memory in nodes becomes scarce.

Key-Value RDD or simply pair RDD, are specialized RDD to support KV pairs, where each element of the RDD is a pair tuple (k,v), hence referred to as "pair RDD".

RDD Persistence

Every RDD can further be persisted in different way, offering a sort of data hook for re-use in other context.   These are very convenient when doing ad-hoc data analysis or machine learning where same state of a dataset can be cached and re-used in various ways.   This contrasts with the more batch-oriented synchronous operations typical of Hadoop.

Storage is characterised by:
  1. The Storage levels (memory-only, memory and disk)
  2. Explicit API calls (persist(), cache())
When deciding to persist or cache, one must consider the gain obtained compared to a full re-computation and the total memory size needed vs available in nodes.

So in a nutshell, RDD has these properties :

  • immutable once constructed
  • track lineage information to be recomputed in case of lost data or node failure
  • enable operations on collection of elements in parallel

and can be instantiated:

  • by paralellizing existing collection (ex. after generating a list in Python, Spark will split it into many partitions and distribute these among the nodes) 
  • by transforming any existing RDDs (sourcing)
  • from input files (in various storage system and format) 

and also user controls behaviour through two properties:

  1. its distribution property (number of nodes to be partitioned into) 
  2. its persistence (whether or not data should be cache for faster downstream consumption). 


The operations defined on RDD are either Transformation or Action.

Transformation consist of some operations (map, filter, distinct, union, grouping) happening on a given RDD and leading up to the generation of a new RDD node (from-to node).   All transformations are lazy operations that get executed only when necessary (i.e. when an action is triggered or a reduce operation is necessary).

Action consist of applying operations over all elements of one RDD (associative function like reduce or others like count, first, take sample, forEeach, ...) and producing some value output.

conceptual role of Transformations and Actions 

Data source 

Spark can interact with various input format frequently seen in high-end analytics platform:
  1. Local files (file://...), directory or SequenceFile
  2. Hadoop Distributed FileSystem (hdfs://...)
  3. HBase 
  4. S3


These are the function literal defined in Transformation or Action.  These closures are represented as Java object which are pushed to all Workers by Spark (after serialization/deserialization).  There will be one Closure per Worker and these are sent one way: driver --> worker.

Same method is used to push any global variable as well, although this is not ideal to share information/state among Workers and back to driver.  To share state, there are better approach like special variables called Accumulators and BroadcastVariable.

Shared Variables:

Another abstraction is the concept of Shared Variable that are used for parallel operations.  These are used to share state information (through variable) between nodes themselves and/or between nodes and the driver.

Broadcast variable concept
These variables are designed to help optimise sharing global resource throughout all nodes.   They are read-only variables cached at each node avoiding the need to ship copies during tasks.   Spark will efficiently distribute these as needed with the objective to reduce global communication cost.

This can be used, for example, when we want to score a predictive model applied in parallel to a very large distributed dataset.  This model should then be broadcast to all involved nodes prior to scoring.

These can only be added through an associative operations (to define ..).   They are only readable by the driver program although workers are changing it.  This is supported out-of-the-box for either numeric type or standard mutable collections.   To support other types one need to develop an extension.

These are useful when we need global counters and sums done in parallel.

Spark Program lifecycle:

Spark's code normally follows a series of steps: the data source definition (input RDD), some transformations applied on each RDD's and finally the Actions used to retrieve the needed information:

  1. Get a SparkContext which is the entry point to all functionalities (this is implicit in interactive Shell env with the "sc" variable)
  2. Creation of source RDDs 
    • generating a dataset manually (sc.parallelize([collections...]))
    • sourcing from file (file://..), or HDFS (hdfs://...)
    • sourcing from Hadoop Input (sc.hadoopFile()) 
  3.  Creation of a series of Transformation happening sequentially from source
    • map(), flatMap(), filter(), reduceByKey(), groupByKey(), join()
  4.  Creation of Actions to return some values to the Driver
    • collect(), take(), count(), reduce(), saveAsText()

All these steps are lazily defined and only Action will trigger any processing.   Spark will go over these steps for each new job definition :
  1. Creation of the DAG with all RDD's lineage definition 
    • a direct mapping exist between the DAG and the user code
  2. Creation of a logical Execution plan optimised for the DAG 
    • Spark will aggressively pipeline operations, when these can be generated independently of other steps (ex. a read and map task can be fused together) 
    • Spark will also split tasks into stages to better exploit pipelining and caching
  3. Schedule and Execution of individual tasks according to this logical Execution plan (see next, runtime engine model).

Runtime Engine model

Spark applications involve the execution of independent sets of processes over a cluster.  This requires quite a bit of coordination which is handled by what s called the SparkContext (SC) object.  This object is running inside the Driver program, the main entry point where any Spark application is launched.  The SC can connect to many types of cluster manager (ex. Spark own standalone, Meso, YARN, ..) which handle resource allocation over the cluster.   

After connecting, Spark will acquire Executor on each Working node.   These are run in a single JVM instance located on cluster node and spawned to serve Spark application.  Executor's role is to run computation and to store data done via Spark Tasks  (smallest unit of work) executing in different thread inside the JVM.   Executor remain active as long as the Spark application is running, even when no jobs are running.   This feature allows Spark to start up Task very quickly and process in-memory data at speed not possible with Hadoop.    

Once Executor are spawned inside each Working node, Spark will send the application code to these executors.  This application code can be JAR files or yet python files depending on how the Application was coded inside the SC.  Then finally SC can send tasks for each executors to run. 
Source: ""

Important properties of this model : No communication ever happen betweens Workers!

Task scheduling

Job scheduler's role is to run user-defined job as a sequence of stages that takes into account partition having persistent RDD in memory, possibility to pipeline transformations (individual task) with narrow dependency, separating transformation with wide dependencies (require shuffling).   Scheduler also consider data locality and preferred location.   If a task needs to process a partition available on a specific node, the scheduler will send it to that node, or if a task processes a partition for which the containing RDD provides preferred location (typical of HDFS file), again the scheduler will send it to those.

For example, this "simple" job definition:"ERROR")
rdd2.join(rdd1, key).take(10)

Will yield the following DAG flow:
Source: databricks (

After examination of this DAG, the Scheduler defines stages to execute in a way to combine or pipeline all narrow transformations.  Here, RDD A-B would be stage-1 and RDD C-D-E stage-2 and the final join would be stage-3.  

In summary, Scheduling has these roles and responsibilities:
  • running all task in graph (the "full" DAG in proper sequence)
  • pipelining functions when possible (ex.  a map() and filter() transformations could be pipelined together and done in one stage, as a way to avoid processing later discarded data elements)
  • reusing cache-aware data and exploiting data locality
  • being partition-aware to reduce shuffle

Main Differentiation points:

The main selling points are actually Hadoop's biggest drawbacks that lead to the creation of many extension and add-on built around Hadoop ecosystem.   Hadoop biggest complain relate to its inefficiency toward iterative computing and interactive querying (due to low latency dispatcher), its Disk-only based approach, and its complex and verbose coding (complex algo only possible through chaining many individual MR jobs).  

By contrast, Spark offers:
  • Easy to develop environment
    • A rich API using Scala, Python or Java 
    • Leverage a real functional language namely Scala which makes it more aligned to manipulate MR (MapReduce) programming model (they figure it would minimise impedance with this programming model not directly accessible with pre-1.8 Java).
    • Scala and Python being interpreted languages, this allows Spark to have its own interactive shell which is so useful in interactive analysis (pyspark and spark_shell). I'd say this time-saver has become the norm now. 
  •  Fast to run platform
    • The general execution graph (DAG) concept is key in providing speed and lazy transformation
    • In-memory (cache) and persistent storage is really allowing the platform to move away from batch-oriented paradigm to more realtime and iterative workflow.

Versatile deployment configuration with many possible installation options.  For example, the Cluster Manager type can be deployed on these scheme :
  1. Standalone local (to run simple demo/prototyping)
  2. Standalone Cluster-based  (ex. running on same nodes as your Hadoop HDFS-based cluster) 
  3. Yarn-based 
  4. Mesos-based  


Sunday, January 11, 2015

Cassandra data modelling

This post describes NoSQL Cassandra database solution with focus on data modelling aspect.

Cassandra competes in the space of NoSQL Column Family Storage engine, named under various terms like Wide Column Store or BigTable's storage in reference to the influential Google BigTable paper.   Competing candidate includes the HBase engine which is a Java open source implementation of the BigTable spec.   Note: I do not use the term Columnstore or Columnar database as these usually imply column-oriented DBMS like MonetDB or Vertica whose goals is to change RDBMS data storage from row-based to column based.

In general, Column Family solutions do not fit into the 3 common categories defined in NoSQL:
  • KeyValue Store: Column Family does not simply store a single value per Key but rather hold many columns to handle wide data along these columns.  (note K-V Store could also hold composite values but these would be kept and treated as a single object) 
  • Document Store: documents are usually stored using any open (as in visible) text format (JSON is the usual approach) and using web-friendly language like javascript and RESTful interface. 
  • Graph Database: a complete different paradigm is used here following the principles of Graph theory. 

Column Family category is designed to focus on storing and processing large amount of structure and semi-structure of data in a distributed and non-relational way.   In this sense, they could become an alternative or complementary solution within the data warehousing industry.  

I will not go through a detail description of Cassandra Storage engine (C*), so anyone interested can find plenty of online resource, discussion and other blog around ( source should imply open community).   I will rather focus on its impact and support for structural data modelling.

The first section gives some general aspect of Column Family engine, followed by various C* data modelling structure and design criteria, and finally I'll comment on how it could be a good fit (or not) to the data warehousing and BI world.

Column Family Design highlights

NoSQL community gains attention with its or sales pitch, as if data modelling has now become just a burden.   We should always be wary of such simplified conclusion and in terms of modelling, whatever you model upfront will be gained down the line.  A complete dynamic schema-less (unknown) model can not be a sustainable and long living model, especially for mostly read-only application such as Business Intelligence and Data warehousing!

Although Column Family platforms surfed on the NoSQL movement (fill in your own definition), they do require and benefit from some level of a priori data modelling.   But modelling should not be approached from the same angle as traditional relational database.   The major variations originate from these key characteristics:
    1. Data's row storage assumes unique Row-key
      • Row-key is central to the data storage and access.. much like any K-V Storage engine
      • Row-key acts like a Primary key, but without any global constraint checks enabled (share nothing requirement)
      • Row-key is central to horizontal scalability obtained through the use of data Partitioning (Sharding) among all distributed nodes within a cluster.  This is typically done by applying some form of hashing (ex. MD5) on the row-key.
      • Row-key can be of any type, including composite, as long as it can be hashed deterministically 
      • Row-key is the main predicate where Query is happening (or key range)

    2. Schema is much more dynamic 
      • Forget about having to define a fix set of columns with static type and length
      • You can still define columns for your table, but these are not mandatory for all row
      • Collection type for a column is possible (a similar feature is found in Postgres)
      • Columns can be completely arbitrary and set dynamically by application (column value and name)

    3. Joins are not implemented by the Storage engine  
      • Joining data across Table (i.e. Column Family) will require involvement of application/client layer 
      • Much data denormalization and increase of redundancy are usually needed to satisfy different data access path.  
      • Data model is the result from data access pattern analysis aiming to avoid any join between Column Family (the whole data graph is available directly from the set of columns stored along each row).
    4. Columns have richer and new semantics 
      • Columns no longer viewed only as features or attributes pertaining to each row or entity instance
      • Columns can also be viewed as data instance or values, similar to when we pivot rows into columns in a relational query 
      • Columns can grow quite extensively ... the upper limit will probably be dictated by performance or other constraint found in your application.

Cassandra Data Architecture  

At high level, C* data architecture is decomposed into these concepts:

Keyspace -->n  Columns Family  --->n  Row --->n  ColName-ColValue-Timestpamp

Keyspace delimits your application boundary (think of Database or schema in DBMS world) and will store all related set of Column Family onto the cluster.   Typical Cluster would have one Keyspace per application, and important global property are defined at this level (ex. replication factor).  

Columns Family is the equivalent of a relation Table, so it contains columns and rows for storing data pertaining to the entity we are keeping track of.

Each Row is associated with one instance of this entity stored which is identified through a mandatory and unique row-key attribute.   The most atomic storage unit is actually the triplet: ColumnName--ColumnValue--Timestamp, so that each row may include any number of these triplets.  The timestamp is automatically generated and is internal to C* for managing data versioning and conflict resolution.

Let's loook at some data modelling design common to C*.

1-Simple Case:

The easiest use-case is to define a "static" set of columns within one Column-Family:

Row-Key               |  Columns
             | Email          Address    Phone       Status      
username1    |      address1   13332       Married     
             | Email          Address    Phone       Status      
             | --------------------------------------------------
username2    |      address2   13333       Single      
             | Email          Address    Status      
             | --------------------------------------
username3    |     address3   Married     
             | Email          Address    
             | --------------------------
username4    |     address4   

Note that all columns are optional, and may simply be ignored as opposed to being explicitly set as Nullable for handling missing values (as found in relational DB).   In fact more importantly, the Schema-less nature means that literally all columns from row to row could change arbitrarily.  Although, we would want to have some form of stability in defining column in Column Family, in order to get some form of predictability in data access pattern.

2-Wide-row Case:

Another structure is what is known as wide-row. This is a pattern well adapted for time series data.  Let's say we are recording, for a number of devices, values at fixed time point ... we may choose to store each device as one row, and generate columns dynamically into this wide-row:

Row-Key               |  Columns
             | Date1-12:32:24  Date2-12:33:56  Date2-13:01:32    
             | --------------------------------------------------
device1      | low             low             high              
             | Date8-17:04:24  Date8-18:33:55  Date8-18:41:32    
             | --------------------------------------------------
device2      | low            medium          high               

Here each Column Name is defined by the recording time-stamp and the structure will grow dynamically to accommodate all upcoming recording.  Although up to 2 billion columns are possible, we would limit this growing factor (for efficiency reason) by Bucketing the row-key by appending temporal data into row-key (ex.  'deviceID-day' or 'deviceID-YearMth', etc) so that one row will only store recordings for a limited time span.

More options are possible, if we wanted to sort the column time descending so that we keep the last 15 measurement points, or yet preserve only a limited number of measurement with the use of TTL (time to leave) property to automatically expire oldest column values.

3-Dynamic-column Case:

We can make things even more dynamic by leveraging Cassandra's usage of arbitrary and application-supplied column name to store data.  In this case, we may not even need to store any value, as the column name become the value itself...

As an example, let's assume we have some user that subscribe to named service, so that we wish to keep all their subscribed service-name directly as column:

Row-Key               |  Columns
             | service-1  service-Z  service-tt
             | --------------------------------
username1    |                                 
             | serviceEE  serviceRR  service-tt  service-A   
             | ----------------------------------------------
username2    |                                               
             | service-AAA
             | -----------
username3    |            
             | serviceA   serviceE
             | -------------------
username4    |                    

No structure is given at creation time and the application provides this dynamically during runtime. This is sometimes referred to as Dynamic Column Family Design.

4-Collection-based column Case:

The one-to-many association between one row-key does not necessarily imply the creation of multiple columns for that row.  One may leverage the concept of C* collection data types.  This is equivalent to collection type supported in most programming language (ex. Set, List and Map).

Taking the previous username data, we could exploit these to store the many email addresses or physical address associated to a single user:

Row-Key               |  Columns
             | Emails                 Addresses                         
username1    | {,u2@..}      {Home: address1, Work: addr2 ..}  
             | Emails                 Addresses                              
             | --------------------------------------------------------------
username2    | {,            {Home: addressHome, Work: addressW2    
             |,}   Home2: AddrHome2}                     
             | Emails                 Addresses       
             | ---------------------------------------
username3    | {}           {Home:address3} 

Data model design criteria

To be able to produce useful data model, one needs a minimum of knowledge on how data is stored inside a C* cluster.   Important aspect has to do with clustering.  Clustering is completely symmetrical, i.e. all nodes participating in the cluster have identical characteristics:  Shared Nothing, independent, identical responsibilities, self sufficient (can be added/removed on demands) with no single point of failure or contention.

Another important point is that rows identifiable by their unique row-key will always land on same node (aside replication consideration resulting in copies stored in other nodes).  The full row can then be read sequentially from file data block within a single node such that random read/write can be often avoided.   With that in mind, here are some criteria important for performance:  
  1. Accessing data by row-key will be extremely fast (data partition with key hashing can be viewed as implicit index).
  2. Store as much data into single Row so that the query/access pattern will involve a single logical sequential read.
  3. Data modelling must be aligned with data access pattern which becomes the driving factor for Data modeling!
  4. Denormalization becomes the norm as each data access path may involve the creation of new (redundant) Column Family.
  5. Design criteria has more to do read/write operations done on data than coming up with clear semantic for entity.
  6. Joining two Column Family is simply not possible.  We must fallback to client application logic, and thus likely to be avoided in most cases.  By extension, neither does C* support Sub Queries
  7. Avoid set operations or bulk read/write where a large number of rows are targeted 

C* offers rich and powerful data modelling structure but one has to keep these criteria in mind.  This requires a shift in thinking in design principles for anyone coming from RDBMS world.   In summary, well be doing more Data Denormalisation and Aggregation  in order to:
  • Reduce Query-I/O data transfer volume 
    • (keep query and processing local!)
  • Reduce Query-Processing complexity (no JOINS)
    • (distributed system are inherently more complex) 
But this is done at the cost of:
  • Increasing total data volume storage (disk storage is cheap)  

Any good use case for BI?

It follows that C* may not be so suited for BI.   For example consider the No Join limitation!  BI tradition applications strive for supporting many ad-hoc data access paths implying joining data all over the place.   The entire Star Schema design and the multidimensional counter part were created to guarantee good performance and scale no matter how many dimensions you'd want your metrics against.

Eliminating the possibility of Join mean to denormalize the entire fact/dimension tables structure into their constituents: all dimension hierarchical attributes flatten out alongside the metrics.  And you'd still be limited to do key look-up query-type (or at best key range lookup).  However what we need is a capacity for doing Set or Bulk read operations, involving full scan of data in order to calculate aggregate KPI or metrics!

As of now, we can see that C* is mostly an application-driven data management and storage solution targeting real time operation application (high availability and low latency).   Taken from DataStax FAQ website, here are its supposedly main use cases :

  • Time series data management
  • High-velocity device data ingestion and analysis
  • Media streaming (e.g., music, movies)
  • Social media input and analysis
  • Online web retail (e.g., shopping carts, user transactions)
  • Web log management / analysis
  • Web click-stream analysis
  • Real-time data analytics
  • Online gaming (e.g., real-time messaging)
  • Write-intensive transaction systems
  • Buyer event analytics
  • Risk analysis and management

I can clearly see a good use-case in the storage of real time data collected by sensors or devices, i.e. time series data.   These are expected to explode with the advent of the internet of things.  C* could certainly be a viable platform to help us meet the Velocity and Volumetry requirement, the famous 2 V's of the not least-famous Big Data buzz.    But from earlier points, I'd say its data analytical potential would be limited to basic metric calculation like moving average, min/max and other simple data analysis.   These assume data processing happening at row level as opposed to across rows!   For any analytical needs crossing/joining over many dimension, you'll need to export your data to other more specialised analytical platform! 

Another use case is possible if we leverage the wide row and dynamic columns aspects.  It is frequent in BI that we need to compute and preserve a large number of metrics and KPI, per business entity, and calculated over some period.   For example, the customer centric view (typical of CRM) need large number of metrics measuring historical behavior/performance at each customer.   Or product portfolio analysis monitoring key indicators of rentability and popularity per product.   These applications involves flatten and dynamic data, AKA KPI/metrics flat structure.  Any RDBMS implementations will be limited in one form or another due to its static and fix nature of its data schema.  One can use generic name columns, but guessing which data types will be needed in the future can be challenging.  One could also build some metadata-driven engine to take care of creation/deleting metrics on-demand, but this will pose issues with timeline and historical data and storage.

In this sense, I see C* as offering a versatile and accommodating platform to deal with the dynamic nature of columns.    We can easily expand our KPI's list to the thousands or more, seemingly and elegantly!   We could choose different physical design for the row grain definition, by choosing different level for the row-key :

1- row-key = customerID (columns = PeriodId_kpiInd, ...)
 and keep only a few years worth of data with TTL

Row-Key               |  Columns
             | 2014-01:Kpi1  2014-01:Kpi2 ... 2014-02:Kpi1 ....               
customer1    | 1234.76       32           ... 1657.43      ....         
             | 2014-01:Kpi1  2014-01:Kpi2 ... 2014-02:Kpi1 ....               
customer2    | 1234.76       32           ... 1657.43      ....         

That way, we have all KPIs belonging to one customer stored in a single wide row, and leverage sequential read for doing YTD or Year-to-Year comparison calculation.  We could also leverage the TTL attribute to only keep a limited number of years of historical data, and let C* manages the purging automatically.

2- row-key = customerID-PeriodId (columns = kpiInd, ...)

Row-Key               |  Columns
             | Kpi1          Kpi2 ...   KpiN        ...      
c1:2014-01   | 1234.76       32   ...   1656557.43  ...
             | Kpi1          Kpi2 ...   KpiN        ...      
c1:2014-02   | 1657.43       37         1657921.22  ...

Here, we'd end-up with much narrower row but would require crossing different row-keys to do customer longitudinal analysis like Year-to-Year or other KPIs comparison throughout customer lifetime.

One could think of alternative way of storing these flatten KPIs/metrics, but again, this will not be suited and ideal to do set-based and cross entities pattern analysis.    Rather it should be thought of as a flexible storage mechanisms with potential and limited fine-grain analysis or one-one entity comparison.

In summary, I see a few cases where we could make use of C* for providing BI-related functionalities, however it is surely not a suitable storage solution when one is looking for a generic and global platform!