Showing posts with label NoSQL. Show all posts
Showing posts with label NoSQL. Show all posts

Wednesday, February 24, 2016

Neo4J, the Graph database

At work, we are confronted with the implementation of a database involving rich and densely interconnected data (.. more relationships than entities) and involving complex recursive query to run.  We thought it could be a good opportunity to test out Graph database.

In a nutshell, Graph database are a good candidate with:
  • highly connected dataset
  • data access through a multi-steps traversals (finding stuff in a network/hierarchy of nodes)
  • data access that is both flexible and unknown (path is changed according to nodes encounter in the way)
  • ultimate flexibility with thousands of classes (node type) having different properties with new classes/properties defined everyday!
We decided to go with Neo4J which seems to be the most stable implementation having the longest track record.  The following are notes that are relevant to Neo4J but some general advice are probably applicable to other implementations. 

I've taken some notes that I share here... (I've also worked more on the Cypher language and will try to post my Python notebook experimentation later).

1) Nodes

These are our entities characterised by attributes, called properties.  It is schema-less, so entity may have different properties. Things to avoid:

  • Nodes with rich properties.  Ex. Country node type with language spoken, currency, etc..  (it's a smell indicating these properties should probably correspond to Nodes) 
  • Nodes should be fine-grained, so avoid nodes representing multiple concept.
2) Relationship

The relationship are for access.  Access using global index is not typical, we do traversing instead using relationship are the primary means for nodes access.
  • Properties on relationship that represent entity (frequent with relationship like "WorkFor", "GrantedTo"..)
  • Used to access neighbouring nodes starting from one nodes and traverse many using the Relationship type and properties.
3) Model

The data model is fun and easy to do simply with whiteboard drawing: you model by Examples and not by Class! So quite suitable to do with business and used a communication tool.
The model is typically normalised and any properties appearing in multiple places should probably be nodes.


4) Anti-Patterns

- Avoid hefty nodes: User nodes with many properties like names, email, address, job, ... a lot of times, these correspond to nodes.

- missing nodes: John emailed Tim, make it 3 nodes with email a node also, and not a relationship between node John and Tim.

- Avoid hot nodes: nodes having a very large number of incoming relationships.


5) Graph Querying

2 approaches are possible for querying a Graph DB:
  1. Graph traversing
    • Start off with some nodes (the roots)
    • Spread out from these according to given path
    • Yield some side-effect in the process (intermediate computation, ..)
  2. Graph pattern matching 
    • Provide an abstract (sub)graph with any number of defined instances
    • Graph engine determines all subgraphs of the larger one fitting the pattern

The first category (imperative language) is used by generic RDF language like SPARQL and Gremlin (this one can support the two). The second one (declarative language like SQL) and is used by language like Cypher.

6) Loading Data 

There are a few ways we can load data into Neo4J: 1) transactional (live update through its REST-API), 2) batch (for high volume like initial load), 3) Cypher (similar to suing DML commands in SQL), or 4) through some API based on Java or other language.

Various tools can also be used for data loading, whether they are XLS-based, Neo4J-shell based, CLI-based (command line programs like batch importer), Neo4J browser application, or ETL-based.


Martin 






Friday, June 05, 2015

Spark data processing/analytics platform


At work we are looking to leverage a large scale data processing/analytics platform called Spark.   Before doing some hands-on work, I will always do some research to help me get started and have better insights and context on a large scale platform.   So this post summarises these notes on one of the hottest techno on Big-Data which officially past the-peak-of-inflated-expectation!  

Spark is a Data analytic/processing Platform optimised for iterative algorithm and interactive data mining and batched analytics.    

Spark provides a unified platform leveraging MapReduce (MR) parallel data processing capabilities while avoiding some limitations associated to the ancestor open source platform Hadoop.   Spark came out of functional programming model like MR, however it is also a broader generalisation of that model as more than just MR-type tasks are supported.

Underneath Spark's platform architecture lies what is called Spark execution engine.  On top of that you have extra modules like Shark for SQL capability, Spark streaming, MLib for machine learning, GraphX for Graph-based computation, and language-specific libraries (3 supported: scala, java, and python).   See here for an example of a platform integration stack of one vendor.

Also a key component is its Data storage abtraction model called Resilient Distributed Data (RDD) which was designed for coarsed-grained data transformations operating over the majority of data elements.  Consequently it is not suited to operate asynchronously over fine-grained share data, typical of RDBMS.  



Core Primitives

RDD: 

RDD can be qualified as the cornerstone of Spark data management.   In essence, these are data abstraction for immutable and fault-tolerant collections of elements built explicitly for parallelism done on a cluster.   Immutability provides support to have a more efficient fault-tolerant system critical in distributed environment.

Data is processed by chaining succession of RDD's transformation each performing bulk writes on a new RDD, each of which can be easily recovered by reapplying the same transformation definition in case of node failure or straggle (slow node).   These lead to an important property called lineage obtained directly from the resulting directed acyclic graph (DAG) of all transformation definition.  

From this DAG lineage property, it follows that any RDD could be easily re-created after a node failure by regenerating source RDDs.

Simple DAG showing RDD lineage


RDD persistence is controllable, so it is up to the user to decide which steps should be cached when we may need future re-use and/or better performance.

Cached RDD are stored as Java objects for fast access within JVM, but can also be stored on disk in serialised form (could have a smaller footprint than the Java object) in case where memory in nodes becomes scarce.

Key-Value RDD or simply pair RDD, are specialized RDD to support KV pairs, where each element of the RDD is a pair tuple (k,v), hence referred to as "pair RDD".


RDD Persistence

Every RDD can further be persisted in different way, offering a sort of data hook for re-use in other context.   These are very convenient when doing ad-hoc data analysis or machine learning where same state of a dataset can be cached and re-used in various ways.   This contrasts with the more batch-oriented synchronous operations typical of Hadoop.

Storage is characterised by:
  1. The Storage levels (memory-only, memory and disk)
  2. Explicit API calls (persist(), cache())
When deciding to persist or cache, one must consider the gain obtained compared to a full re-computation and the total memory size needed vs available in nodes.


So in a nutshell, RDD has these properties :

  • immutable once constructed
  • track lineage information to be recomputed in case of lost data or node failure
  • enable operations on collection of elements in parallel

and can be instantiated:

  • by paralellizing existing collection (ex. after generating a list in Python, Spark will split it into many partitions and distribute these among the nodes) 
  • by transforming any existing RDDs (sourcing)
  • from input files (in various storage system and format) 


and also user controls behaviour through two properties:

  1. its distribution property (number of nodes to be partitioned into) 
  2. its persistence (whether or not data should be cache for faster downstream consumption). 



Operation:  


The operations defined on RDD are either Transformation or Action.

Transformation consist of some operations (map, filter, distinct, union, grouping) happening on a given RDD and leading up to the generation of a new RDD node (from-to node).   All transformations are lazy operations that get executed only when necessary (i.e. when an action is triggered or a reduce operation is necessary).

Action consist of applying operations over all elements of one RDD (associative function like reduce or others like count, first, take sample, forEeach, ...) and producing some value output.

conceptual role of Transformations and Actions 




Data source 

Spark can interact with various input format frequently seen in high-end analytics platform:
  1. Local files (file://...), directory or SequenceFile
  2. Hadoop Distributed FileSystem (hdfs://...)
  3. HBase 
  4. S3

Closure:

These are the function literal defined in Transformation or Action.  These closures are represented as Java object which are pushed to all Workers by Spark (after serialization/deserialization).  There will be one Closure per Worker and these are sent one way: driver --> worker.

Same method is used to push any global variable as well, although this is not ideal to share information/state among Workers and back to driver.  To share state, there are better approach like special variables called Accumulators and BroadcastVariable.

Shared Variables:

Another abstraction is the concept of Shared Variable that are used for parallel operations.  These are used to share state information (through variable) between nodes themselves and/or between nodes and the driver.

Broadcast variable concept
These variables are designed to help optimise sharing global resource throughout all nodes.   They are read-only variables cached at each node avoiding the need to ship copies during tasks.   Spark will efficiently distribute these as needed with the objective to reduce global communication cost.

This can be used, for example, when we want to score a predictive model applied in parallel to a very large distributed dataset.  This model should then be broadcast to all involved nodes prior to scoring.

Accumulators
These can only be added through an associative operations (to define ..).   They are only readable by the driver program although workers are changing it.  This is supported out-of-the-box for either numeric type or standard mutable collections.   To support other types one need to develop an extension.

These are useful when we need global counters and sums done in parallel.


Spark Program lifecycle:


Spark's code normally follows a series of steps: the data source definition (input RDD), some transformations applied on each RDD's and finally the Actions used to retrieve the needed information:

  1. Get a SparkContext which is the entry point to all functionalities (this is implicit in interactive Shell env with the "sc" variable)
  2. Creation of source RDDs 
    • generating a dataset manually (sc.parallelize([collections...]))
    • sourcing from file (file://..), or HDFS (hdfs://...)
    • sourcing from Hadoop Input (sc.hadoopFile()) 
  3.  Creation of a series of Transformation happening sequentially from source
    • map(), flatMap(), filter(), reduceByKey(), groupByKey(), join()
  4.  Creation of Actions to return some values to the Driver
    • collect(), take(), count(), reduce(), saveAsText()

All these steps are lazily defined and only Action will trigger any processing.   Spark will go over these steps for each new job definition :
  1. Creation of the DAG with all RDD's lineage definition 
    • a direct mapping exist between the DAG and the user code
  2. Creation of a logical Execution plan optimised for the DAG 
    • Spark will aggressively pipeline operations, when these can be generated independently of other steps (ex. a read and map task can be fused together) 
    • Spark will also split tasks into stages to better exploit pipelining and caching
  3. Schedule and Execution of individual tasks according to this logical Execution plan (see next, runtime engine model).

Runtime Engine model

Spark applications involve the execution of independent sets of processes over a cluster.  This requires quite a bit of coordination which is handled by what s called the SparkContext (SC) object.  This object is running inside the Driver program, the main entry point where any Spark application is launched.  The SC can connect to many types of cluster manager (ex. Spark own standalone, Meso, YARN, ..) which handle resource allocation over the cluster.   

After connecting, Spark will acquire Executor on each Working node.   These are run in a single JVM instance located on cluster node and spawned to serve Spark application.  Executor's role is to run computation and to store data done via Spark Tasks  (smallest unit of work) executing in different thread inside the JVM.   Executor remain active as long as the Spark application is running, even when no jobs are running.   This feature allows Spark to start up Task very quickly and process in-memory data at speed not possible with Hadoop.    

Once Executor are spawned inside each Working node, Spark will send the application code to these executors.  This application code can be JAR files or yet python files depending on how the Application was coded inside the SC.  Then finally SC can send tasks for each executors to run. 
Source: "http://stanford.edu/~rezab/sparkclass/slides/itas_workshop.pdf"

Important properties of this model : No communication ever happen betweens Workers!


Task scheduling

Job scheduler's role is to run user-defined job as a sequence of stages that takes into account partition having persistent RDD in memory, possibility to pipeline transformations (individual task) with narrow dependency, separating transformation with wide dependencies (require shuffling).   Scheduler also consider data locality and preferred location.   If a task needs to process a partition available on a specific node, the scheduler will send it to that node, or if a task processes a partition for which the containing RDD provides preferred location (typical of HDFS file), again the scheduler will send it to those.

For example, this "simple" job definition:

rdd1.map(splitlines).filter("ERROR")
rdd2.map(splitlines).groupBy(key)
rdd2.join(rdd1, key).take(10)

Will yield the following DAG flow:
Source: databricks (https://databricks.com/blog/2014/03/20/apache-spark-a-delight-for-developers.html)

After examination of this DAG, the Scheduler defines stages to execute in a way to combine or pipeline all narrow transformations.  Here, RDD A-B would be stage-1 and RDD C-D-E stage-2 and the final join would be stage-3.  


In summary, Scheduling has these roles and responsibilities:
  • running all task in graph (the "full" DAG in proper sequence)
  • pipelining functions when possible (ex.  a map() and filter() transformations could be pipelined together and done in one stage, as a way to avoid processing later discarded data elements)
  • reusing cache-aware data and exploiting data locality
  • being partition-aware to reduce shuffle


Main Differentiation points:


The main selling points are actually Hadoop's biggest drawbacks that lead to the creation of many extension and add-on built around Hadoop ecosystem.   Hadoop biggest complain relate to its inefficiency toward iterative computing and interactive querying (due to low latency dispatcher), its Disk-only based approach, and its complex and verbose coding (complex algo only possible through chaining many individual MR jobs).  

By contrast, Spark offers:
  • Easy to develop environment
    • A rich API using Scala, Python or Java 
    • Leverage a real functional language namely Scala which makes it more aligned to manipulate MR (MapReduce) programming model (they figure it would minimise impedance with this programming model not directly accessible with pre-1.8 Java).
    • Scala and Python being interpreted languages, this allows Spark to have its own interactive shell which is so useful in interactive analysis (pyspark and spark_shell). I'd say this time-saver has become the norm now. 
  •  Fast to run platform
    • The general execution graph (DAG) concept is key in providing speed and lazy transformation
    • In-memory (cache) and persistent storage is really allowing the platform to move away from batch-oriented paradigm to more realtime and iterative workflow.

Versatile deployment configuration with many possible installation options.  For example, the Cluster Manager type can be deployed on these scheme :
  1. Standalone local (to run simple demo/prototyping)
  2. Standalone Cluster-based  (ex. running on same nodes as your Hadoop HDFS-based cluster) 
  3. Yarn-based 
  4. Mesos-based  


Martin

Sunday, January 11, 2015

Cassandra data modelling

This post describes NoSQL Cassandra database solution with focus on data modelling aspect.

Cassandra competes in the space of NoSQL Column Family Storage engine, named under various terms like Wide Column Store or BigTable's storage in reference to the influential Google BigTable paper.   Competing candidate includes the HBase engine which is a Java open source implementation of the BigTable spec.   Note: I do not use the term Columnstore or Columnar database as these usually imply column-oriented DBMS like MonetDB or Vertica whose goals is to change RDBMS data storage from row-based to column based.

In general, Column Family solutions do not fit into the 3 common categories defined in NoSQL:
  • KeyValue Store: Column Family does not simply store a single value per Key but rather hold many columns to handle wide data along these columns.  (note K-V Store could also hold composite values but these would be kept and treated as a single object) 
  • Document Store: documents are usually stored using any open (as in visible) text format (JSON is the usual approach) and using web-friendly language like javascript and RESTful interface. 
  • Graph Database: a complete different paradigm is used here following the principles of Graph theory. 

Column Family category is designed to focus on storing and processing large amount of structure and semi-structure of data in a distributed and non-relational way.   In this sense, they could become an alternative or complementary solution within the data warehousing industry.  

I will not go through a detail description of Cassandra Storage engine (C*), so anyone interested can find plenty of online resource, discussion and other blog around (...open source should imply open community).   I will rather focus on its impact and support for structural data modelling.

The first section gives some general aspect of Column Family engine, followed by various C* data modelling structure and design criteria, and finally I'll comment on how it could be a good fit (or not) to the data warehousing and BI world.

Column Family Design highlights

NoSQL community gains attention with its or sales pitch, as if data modelling has now become just a burden.   We should always be wary of such simplified conclusion and in terms of modelling, whatever you model upfront will be gained down the line.  A complete dynamic schema-less (unknown) model can not be a sustainable and long living model, especially for mostly read-only application such as Business Intelligence and Data warehousing!

Although Column Family platforms surfed on the NoSQL movement (fill in your own definition), they do require and benefit from some level of a priori data modelling.   But modelling should not be approached from the same angle as traditional relational database.   The major variations originate from these key characteristics:
    1. Data's row storage assumes unique Row-key
      • Row-key is central to the data storage and access.. much like any K-V Storage engine
      • Row-key acts like a Primary key, but without any global constraint checks enabled (share nothing requirement)
      • Row-key is central to horizontal scalability obtained through the use of data Partitioning (Sharding) among all distributed nodes within a cluster.  This is typically done by applying some form of hashing (ex. MD5) on the row-key.
      • Row-key can be of any type, including composite, as long as it can be hashed deterministically 
      • Row-key is the main predicate where Query is happening (or key range)

    2. Schema is much more dynamic 
      • Forget about having to define a fix set of columns with static type and length
      • You can still define columns for your table, but these are not mandatory for all row
      • Collection type for a column is possible (a similar feature is found in Postgres)
      • Columns can be completely arbitrary and set dynamically by application (column value and name)

    3. Joins are not implemented by the Storage engine  
      • Joining data across Table (i.e. Column Family) will require involvement of application/client layer 
      • Much data denormalization and increase of redundancy are usually needed to satisfy different data access path.  
      • Data model is the result from data access pattern analysis aiming to avoid any join between Column Family (the whole data graph is available directly from the set of columns stored along each row).
      •  
    4. Columns have richer and new semantics 
      • Columns no longer viewed only as features or attributes pertaining to each row or entity instance
      • Columns can also be viewed as data instance or values, similar to when we pivot rows into columns in a relational query 
      • Columns can grow quite extensively ... the upper limit will probably be dictated by performance or other constraint found in your application.

Cassandra Data Architecture  

At high level, C* data architecture is decomposed into these concepts:

Keyspace -->n  Columns Family  --->n  Row --->n  ColName-ColValue-Timestpamp

Keyspace delimits your application boundary (think of Database or schema in DBMS world) and will store all related set of Column Family onto the cluster.   Typical Cluster would have one Keyspace per application, and important global property are defined at this level (ex. replication factor).  

Columns Family is the equivalent of a relation Table, so it contains columns and rows for storing data pertaining to the entity we are keeping track of.

Each Row is associated with one instance of this entity stored which is identified through a mandatory and unique row-key attribute.   The most atomic storage unit is actually the triplet: ColumnName--ColumnValue--Timestamp, so that each row may include any number of these triplets.  The timestamp is automatically generated and is internal to C* for managing data versioning and conflict resolution.

Let's loook at some data modelling design common to C*.

1-Simple Case:

The easiest use-case is to define a "static" set of columns within one Column-Family:

Row-Key               |  Columns
             | Email          Address    Phone       Status      
             --------------------------------------------------
username1    | u@com.com      address1   13332       Married     
             |                                                   
             | Email          Address    Phone       Status      
             | --------------------------------------------------
username2    | u@edu.edu      address2   13333       Single      
             |                                                   
             | Email          Address    Status      
             | --------------------------------------
username3    | u3@com.com     address3   Married     
             |                                       
             | Email          Address    
             | --------------------------
username4    | u4@edu.edu     address4   
             |                           

Note that all columns are optional, and may simply be ignored as opposed to being explicitly set as Nullable for handling missing values (as found in relational DB).   In fact more importantly, the Schema-less nature means that literally all columns from row to row could change arbitrarily.  Although, we would want to have some form of stability in defining column in Column Family, in order to get some form of predictability in data access pattern.

2-Wide-row Case:

Another structure is what is known as wide-row. This is a pattern well adapted for time series data.  Let's say we are recording, for a number of devices, values at fixed time point ... we may choose to store each device as one row, and generate columns dynamically into this wide-row:

Row-Key               |  Columns
             | Date1-12:32:24  Date2-12:33:56  Date2-13:01:32    
             | --------------------------------------------------
device1      | low             low             high              
             |                                                   
             | Date8-17:04:24  Date8-18:33:55  Date8-18:41:32    
             | --------------------------------------------------
device2      | low            medium          high               
             |                                                   

Here each Column Name is defined by the recording time-stamp and the structure will grow dynamically to accommodate all upcoming recording.  Although up to 2 billion columns are possible, we would limit this growing factor (for efficiency reason) by Bucketing the row-key by appending temporal data into row-key (ex.  'deviceID-day' or 'deviceID-YearMth', etc) so that one row will only store recordings for a limited time span.

More options are possible, if we wanted to sort the column time descending so that we keep the last 15 measurement points, or yet preserve only a limited number of measurement with the use of TTL (time to leave) property to automatically expire oldest column values.

3-Dynamic-column Case:

We can make things even more dynamic by leveraging Cassandra's usage of arbitrary and application-supplied column name to store data.  In this case, we may not even need to store any value, as the column name become the value itself...

As an example, let's assume we have some user that subscribe to named service, so that we wish to keep all their subscribed service-name directly as column:

Row-Key               |  Columns
             | service-1  service-Z  service-tt
             | --------------------------------
username1    |                                 
             |                                 
             | serviceEE  serviceRR  service-tt  service-A   
             | ----------------------------------------------
username2    |                                               
             |                                               
             | service-AAA
             | -----------
username3    |            
             |            
             | serviceA   serviceE
             | -------------------
username4    |                    
             |                    

No structure is given at creation time and the application provides this dynamically during runtime. This is sometimes referred to as Dynamic Column Family Design.


4-Collection-based column Case:

The one-to-many association between one row-key does not necessarily imply the creation of multiple columns for that row.  One may leverage the concept of C* collection data types.  This is equivalent to collection type supported in most programming language (ex. Set, List and Map).

Taking the previous username data, we could exploit these to store the many email addresses or physical address associated to a single user:

Row-Key               |  Columns
             | Emails                 Addresses                         
             ---------------------------------------------------------
username1    | {u@com.com,u2@..}      {Home: address1, Work: addr2 ..}  
             |                                                               
             | Emails                 Addresses                              
             | --------------------------------------------------------------
username2    | {u@edu.edu,            {Home: addressHome, Work: addressW2    
             |  m@m.com, perso@p.ch}   Home2: AddrHome2}                     
             |                                                               
             | Emails                 Addresses       
             | ---------------------------------------
username3    | {u3@com.com}           {Home:address3} 
             |                                        



Data model design criteria

To be able to produce useful data model, one needs a minimum of knowledge on how data is stored inside a C* cluster.   Important aspect has to do with clustering.  Clustering is completely symmetrical, i.e. all nodes participating in the cluster have identical characteristics:  Shared Nothing, independent, identical responsibilities, self sufficient (can be added/removed on demands) with no single point of failure or contention.

Another important point is that rows identifiable by their unique row-key will always land on same node (aside replication consideration resulting in copies stored in other nodes).  The full row can then be read sequentially from file data block within a single node such that random read/write can be often avoided.   With that in mind, here are some criteria important for performance:  
  1. Accessing data by row-key will be extremely fast (data partition with key hashing can be viewed as implicit index).
  2. Store as much data into single Row so that the query/access pattern will involve a single logical sequential read.
  3. Data modelling must be aligned with data access pattern which becomes the driving factor for Data modeling!
  4. Denormalization becomes the norm as each data access path may involve the creation of new (redundant) Column Family.
  5. Design criteria has more to do read/write operations done on data than coming up with clear semantic for entity.
  6. Joining two Column Family is simply not possible.  We must fallback to client application logic, and thus likely to be avoided in most cases.  By extension, neither does C* support Sub Queries
  7. Avoid set operations or bulk read/write where a large number of rows are targeted 

C* offers rich and powerful data modelling structure but one has to keep these criteria in mind.  This requires a shift in thinking in design principles for anyone coming from RDBMS world.   In summary, well be doing more Data Denormalisation and Aggregation  in order to:
  • Reduce Query-I/O data transfer volume 
    • (keep query and processing local!)
  • Reduce Query-Processing complexity (no JOINS)
    • (distributed system are inherently more complex) 
But this is done at the cost of:
  • Increasing total data volume storage (disk storage is cheap)  

Any good use case for BI?

It follows that C* may not be so suited for BI.   For example consider the No Join limitation!  BI tradition applications strive for supporting many ad-hoc data access paths implying joining data all over the place.   The entire Star Schema design and the multidimensional counter part were created to guarantee good performance and scale no matter how many dimensions you'd want your metrics against.

Eliminating the possibility of Join mean to denormalize the entire fact/dimension tables structure into their constituents: all dimension hierarchical attributes flatten out alongside the metrics.  And you'd still be limited to do key look-up query-type (or at best key range lookup).  However what we need is a capacity for doing Set or Bulk read operations, involving full scan of data in order to calculate aggregate KPI or metrics!

As of now, we can see that C* is mostly an application-driven data management and storage solution targeting real time operation application (high availability and low latency).   Taken from DataStax FAQ website, here are its supposedly main use cases :

  • Time series data management
  • High-velocity device data ingestion and analysis
  • Media streaming (e.g., music, movies)
  • Social media input and analysis
  • Online web retail (e.g., shopping carts, user transactions)
  • Web log management / analysis
  • Web click-stream analysis
  • Real-time data analytics
  • Online gaming (e.g., real-time messaging)
  • Write-intensive transaction systems
  • Buyer event analytics
  • Risk analysis and management

I can clearly see a good use-case in the storage of real time data collected by sensors or devices, i.e. time series data.   These are expected to explode with the advent of the internet of things.  C* could certainly be a viable platform to help us meet the Velocity and Volumetry requirement, the famous 2 V's of the not least-famous Big Data buzz.    But from earlier points, I'd say its data analytical potential would be limited to basic metric calculation like moving average, min/max and other simple data analysis.   These assume data processing happening at row level as opposed to across rows!   For any analytical needs crossing/joining over many dimension, you'll need to export your data to other more specialised analytical platform! 

Another use case is possible if we leverage the wide row and dynamic columns aspects.  It is frequent in BI that we need to compute and preserve a large number of metrics and KPI, per business entity, and calculated over some period.   For example, the customer centric view (typical of CRM) need large number of metrics measuring historical behavior/performance at each customer.   Or product portfolio analysis monitoring key indicators of rentability and popularity per product.   These applications involves flatten and dynamic data, AKA KPI/metrics flat structure.  Any RDBMS implementations will be limited in one form or another due to its static and fix nature of its data schema.  One can use generic name columns, but guessing which data types will be needed in the future can be challenging.  One could also build some metadata-driven engine to take care of creation/deleting metrics on-demand, but this will pose issues with timeline and historical data and storage.

In this sense, I see C* as offering a versatile and accommodating platform to deal with the dynamic nature of columns.    We can easily expand our KPI's list to the thousands or more, seemingly and elegantly!   We could choose different physical design for the row grain definition, by choosing different level for the row-key :


1- row-key = customerID (columns = PeriodId_kpiInd, ...)
 and keep only a few years worth of data with TTL

Row-Key               |  Columns
             | 2014-01:Kpi1  2014-01:Kpi2 ... 2014-02:Kpi1 ....               
             ---------------------------------------------------------
customer1    | 1234.76       32           ... 1657.43      ....         
             |                                                          
             | 2014-01:Kpi1  2014-01:Kpi2 ... 2014-02:Kpi1 ....               
             ---------------------------------------------------------
customer2    | 1234.76       32           ... 1657.43      ....         
             |                                                          

That way, we have all KPIs belonging to one customer stored in a single wide row, and leverage sequential read for doing YTD or Year-to-Year comparison calculation.  We could also leverage the TTL attribute to only keep a limited number of years of historical data, and let C* manages the purging automatically.


2- row-key = customerID-PeriodId (columns = kpiInd, ...)

Row-Key               |  Columns
             | Kpi1          Kpi2 ...   KpiN        ...      
             ----------------------------------------
c1:2014-01   | 1234.76       32   ...   1656557.43  ...
             |                                         
             | Kpi1          Kpi2 ...   KpiN        ...      
             ----------------------------------------
c1:2014-02   | 1657.43       37         1657921.22  ...
             |                                         

Here, we'd end-up with much narrower row but would require crossing different row-keys to do customer longitudinal analysis like Year-to-Year or other KPIs comparison throughout customer lifetime.

One could think of alternative way of storing these flatten KPIs/metrics, but again, this will not be suited and ideal to do set-based and cross entities pattern analysis.    Rather it should be thought of as a flexible storage mechanisms with potential and limited fine-grain analysis or one-one entity comparison.

In summary, I see a few cases where we could make use of C* for providing BI-related functionalities, however it is surely not a suitable storage solution when one is looking for a generic and global platform!


Martin

Wednesday, May 08, 2013

NoSQL: how disruptive for BI?


It seems the "NoSQL" term is now seen and used everywhere.  This post discusses this new technology, along with its impact in relation to BI world.


Definition

Trying to find a clear and consensual definition is always a challenge with over-hyped and buzz words more  used as a marketing advantage.  Recently, well respected authors Martin Folwer and Pramod Sadalage wrote a clear and informative book on the full spectrum of noSQL technologies called "NoSQL Distilled: a brief guide to the emerging world of polyglot persistence".

According to the book author, no well accepted definition exists for NoSQL, the original name from the NoSQL Meetup was "open source distributed nonrelational databases".  Rather than focusing on definition, the authors rather focus on a few common points all shared by NoSQL databases:

  • Often run off commodity cluster computing
  • Support of large or extremely large data volumes on clusters
  • Query language is not necessarily SQL (usually a close derivative language is used)
  • Usually Open-source and prominent in Web-based applications ecosystems
  • consistency is not done relying on relational ACID principles  (an effect of running off independent cluster nodes)
  • Use Schema-less, i.e. no static data model structure mandatory

What about Cluster-based Solution proposed by Relational vendor
The commodity cluster models lead to a much cheaper and more scalable solution (scaling-out with much small nodes) than the scale-up alternative (ever increasing size of big monolithic and expansive server).  The cluster version of relational databases (ex. Oracle RAC or Microsoft SQL-server) is not designed to be run off multiple independent nodes, as it actually relies on a common and share disk subsystem.   Their model leads to fast and performant throughput with cluster-aware filesystem writing to a single highly available disk subsystem.   This single point of failure is avoided in robust commodity cluster where a server node may go down (both server hardware internals  & disk)) without impacting the cluster on-going execution (full failover and high availability is guaranteed).


NoSQL database:  Metamodel classification 

The author proposed a neat and convenient way to classify current NoSQL database according to how they store data, i.e. the metamodel used.   The relational metamodel is based on relations (tables) and tupple (rows), current NoSQL uses rather these forms:

      1. Key-value: Data stored at Aggregation* level using an "opaque" form
        • Each aggregate has a key or ID to get at the data
        • Each aggregate is opaque as data is stored using unstructured blob
        • (+) use to store whatever we like with just a constraint on size 
        • (-) less flexible in querying & updating (cannot be done on part of a value) 
      2. Document: Data stored at Aggregation level using a visible and explicit semi-structured form (typical of JSON)
        • Each aggregate may have a key or ID to get at the data, but not necessarily as  query can be done on any fields within the aggregate
        • Each aggregate is stored within a defined structure imposing a constraint on what's data is allowed 
        • (+) more flexible in querying (part of a value can be queried or updated)
      3. Columnar: Data is stored using a two-level map with grouping of columns including all rows.
        • Data is identified with a row-ID followed by a column-name normally part of a column-family
        • Query sequence pattern:  Row-key -->  Column-key --> "values"
        • (+) optimal with read-oriented app (ex. BI)
        • (-) not ideal for writing-oriented app 
      4. Graph: Special database using graph metamodel (i.e. nodes & edges) to store records with complex interconnections
        • Each entity or nodes can be queried in relation to interconnected nodes typical of social networks, product preferences..
        • (+) querying is optimal compared to  the relational model which require FK relationship and expansive join operation
        • (-) focus on querying results in more complex & less performant data insertion 
        • (-) less suited with storage across distributed cluster nodes 

*The term aggregation is used to refer to data unit more complex than just a list of values or tuples.  Typically, data unit is consisting of a complex set of records with associations and nested structure, all of which must be manipulated atomically (ex. think about one customer with all related addresses, or Order with all item details, payment info and customer attached to it).


Technologies

Looking at impressive number of open source implementations available (the list just names a few out of the 150 databases currently listed here), we have an indication of the potential growth of this technology, but also an indication of its maturity level... only a few important implementations could likely survive!

A lot of NoSQL database stores may not have clear-cut storage metamodel fitting exactly the definition of M. Fowler et al. metamodel taxonomy.  So depending on interpretation, we may find some stores in various categories, please refer to NoSQL online reference for other interpretation and up-to-date data on this active field.

    1. Key-value Store: 
      • Redis
      • Voldemort
      • Oracle BDB 
      • Amazon SimpleDB 
      • Riak
    2. Document Store:
      • Apache CouchDB
      • MongoDB
      • BaseX
    3. Columnar or Column store: 
      • Cassandra
      • HBase (Hadoop)
      • HyperTable 
      • Apache Accumulo
      • MonetDB
    4. Graph Database: 
      • Neo4J
      • InfoGrid
      • Infinite Graph



BI perspective

I consider Graph database and Column-store to have the greatest impact in BI and to have the potential of being disruptive to current BI market.

For Column-store, this is easily understandable as BI is lot about scanning and other read-only data access where often just a few fields may be fetched out of much wider and flat record.  As a testimony of this, we can look at the number of commercial vendors purchasing existing solution and/or integrating column-store into their own platform :

    • Sybase IQ now part of SAP, probably one of the original pioneer of columnstore in commercial scene
    • Vertica (purchased by HP), the 5-year old vendor specialized and fully dedicated to column-store
    • Greenplum (purchased by EMC) with their "polymorphic" data storage  
    • Paraccel (very recently purchased by Actian) combine column-store with an in-memory engine  
    • Oracle with a twist that they refer to as Hybrid columnar (rows still being used) 
    • Microsoft SQL-Server proposes a compromised approach called "columnstore index" where data update is no longer permitted!  

As for Graph database, I think great opportunity is possible by mining the relationship and interactions contained explicitly or implicitly within these graph content.  Lots of research are currently done on work aiming at providing BI-style analysis against Graph-based data.  Here're two great examples:  http://www.edbt.org/Proceedings/2012-Berlin/papers/workshops/beweb2012/a3-bleco.pdf or else http://perso.ecp.fr/~aufaurema/Graphs%20for%20BI%20(printable).pdf


Martin