At Prosimo, we are solving the problem of providing secure, reliable, and optimized multi-cloud networking between users across the globe and enterprise applications present in one or more clouds (public or private). The Prosimo stack combines cloud networking, performance, security, observability, and cost management—all powered by data insights and machine learning models with autonomous cloud networking to reduce complexity and risk. The source of this visibility is data generated by our AXI endpoints resulting from usage of the Prosimo fabric, leading to generation, processing, and storage of logs in large volume and velocity.
For the purpose of analytics (and for Prosimo in general), necessary logs and metadata of such transactions need to be recorded, without storing the actual data payload sent in each transaction. Even so, a normal record can be a few kilobytes of data, which in turn can result in gigabytes of data in a day for a typical Prosimo deployment. Furthermore, analytics needs to be done in near real-time to achieve these customer outcomes:
- User-level and application-level insights with information such as the user’s geolocation, application experience, amount of data sent/received, etc.
- Monitoring of network health in terms of latency, packet loss, and application experience.
- Evaluation of security posture or risk score for the user in real time, and the ability to take necessary actions depending on enterprise security policies; for example, to generate MFA or to lock a user from accessing enterprise apps until cleared by admin.
These requirements around volume and velocity of data have led the Prosimo Data Science (DS) team to design a scalable, robust, and flexible analytics pipeline. We wanted to share some of our learnings with the DS community as they look to solve challenges in their own environment.
A typical data science pipeline
The following diagram shows a typical analytics pipeline and its components that carry out these functions:
- Performing data capture at the source
- Shipping data to our ETL streaming analytics engine via a robust and scalable message bus cluster
- Running stream and batch analytics jobs
- Maintaining a data warehouse to store processed and enriched analytics data
- Generating real-time alerts in case of any anomalies in system or user behavior
Before diving into each of the individual blocks, we want to point out the following guiding principles that we think should be adopted for designing any analytics pipeline when dealing with a large enterprise network. The analytics pipeline should be:
- Easy to scale, so it can handle an ever-increasing volume of data driven by growth of the enterprise network
- Able to leverage existing proven components
- Flexible to accommodate the addition, removal, or replacement of any particular block in the pipeline, without compromising functionality and performance
- Resilient to failures
- Able to provide continuous monitoring and alerting integration
- Capable of supporting both streaming and batch analytics
- Cost efficient, so it can stay within an operational cost budget
- A source of developer community support
Based on our learnings, we want to share five building blocks we believe will help others build a scalable and resilient analytics pipeline.
The log shipper is a component that resides within the log-generating endpoint, continuously watches various log files, and ships necessary metadata and metrics for telemetry to one or more output destinations.
- A lightweight log shipper that doesn’t consume large amounts of resources in terms of CPU and memory — extremely important in today’s microservices world, where logs are emitted by multiple containers running in a kubernetes environment.
- Ability to tail or watch multiple container log files.
- Simple configuration knobs for necessary features like handling backpressure, memory management, retries on failure, etc.
- Configurable support for fault tolerance in terms of saving state in some kind of DB so that it can resume reading from where it left off after restart.
- Simple to configure or write custom filters to enrich each log (add, remove, replace) with desired information and metadata.
- Support for multiple output destinations, such as files; message bus systems, viz Kafka, Google Pub/Sub. etc.; and a variety of databases, analytics engines, and log collectors, viz Elasticsearch, Splunk, PostgreSQL, Amazon S3, Loki, etc.
There are many excellent log shippers available in the market, but based on the above design considerations, we would like to call out the following four log shippers:
- Fluent Bit
All of these log shippers are decently lightweight in terms of resources and have some or all of the aforementioned support. Filebeat and Logstash are very popular when you have an ELK (Elasticsearch, Logstash, Kibana) based logging infrastructure, otherwise Fluent Bit and Fluentd are excellent choices. If you have to choose between Fluent Bit and Fluentd, we recommend using Fluent Bit because it is a good fit for our previously listed design considerations.
We also recommend that you read the following comparison articles:
Message bus system
The message bus is a messaging infrastructure for asynchronous communication between the different components of the product, which is also responsible for transferring logs emitted by multiple log shippers from various endpoints to ETL jobs.
- Highly scalable distributed cluster design to support multiple parallel message bus clients, aka producers (clients publishing messages) and consumers (clients consuming messages).
- Configurable capabilities such as message partitions (for achieving horizontal scaling per message type) and replication factors (message copies reside in more machines, thereby making it reliable, resilient, and fault tolerant).
- High throughput and low latency (in milliseconds) for real-time streaming of logs.
- Simple to expand an existing cluster by adding more machines without incurring any downtime.
- Not just a message bus: Provides a decent retention period for a configurable amount of days so that it also behaves somewhat like a database, thereby making it ideal for supporting new consumers who are consuming old messages. For example, there might be a scenario in which messages are some list of configurations that need to be made available for any new consumer of this message type.
- Guaranteed message delivery with “at least once” semantics by default, as well as support for “exactly once” semantics.
- Support for key-based messaging, thereby making it possible for our consumers to read certain messages in order.
- Rich with features such as message compaction (for key-based messages, where during cleanup the message bus retains only the latest messages per key, thereby saving space and also making it easier for new consumers to pick up fewer messages) and message compression.
- Support for both authentication and authorization.
- A mirroring tool is included to easily replicate or transfer data from one cluster to another for HA.
When it comes to fast, reliable, and highly scalable message buses, Kafka has become an industry gold standard, not only for transferring huge volumes of data from producers to consumers but also for acting as a data store with a tunable retention period per message type. In fact, we found it was a best fit for our previously listed design considerations.
Comparison with other systems:
Google’s cloud offering, Pub/Sub, is another great choice for a message bus, especially when producers and consumers are running in GCP in the form of cloud functions, app engine apps, etc. It is very simple to configure with various bells and whistles and to get up and running in no time.
One stark difference between Kafka and Pub/Sub is that Pub/Sub provides both push and pull mechanisms for its consumers (subscribers), whereas Kafka supports only pull mechanisms for its consumers.
ETL (Extract, Transform, and Load) jobs are a set of functions or methods that are written to extract a huge volume of data from the message bus and other sources with minimum latency, perform various transformations, and load processed/enriched data to desired output data stores.
When you start planning for these ETL jobs in your analytics pipeline, three questions need to be answered with regard to incoming data:
- Where to process
- What to process
- How to process
There are some excellent frameworks in the market for answering “Where to process” questions, viz Spark, Flink, etc., which are known for fast data processing. However, they all come with an additional onus of actively managing the clusters where these frameworks get deployed. There is an extra operational cost for constantly monitoring worker nodes/VMs and their health, scaling up/down workers based on load, etc. If you are in an early-stage start-up or working on a fast-paced, time-critical project you want to bring out as soon as possible, it becomes extremely critical to focus most of your effort and time on “What to process” and “How to process,” rather than worrying too much about “Where to process.”
- Serverless solution; i.e., fully managed cloud service for executing different kinds of ETL jobs.
- No extra operations cost of managing clusters where these jobs get deployed.
- Support for multiple machine types for deploying ETL jobs based on CPU and memory.
- Support for autoscaling worker nodes of clusters based on a number of factors, such as input size, current throughput, amount of data backlog, CPU usage, etc.
- Performance: It is quite common to have different types of message bus systems and input/output data stores depending on use case, scale, and ease of deployment. One of the basic requirements for these ETL jobs is to have capabilities to work with a variety of such systems and yet provide the same performance in terms of latency and throughput.
- Support for both batch and streaming jobs.
- Rich user-friendly GUI to view ETL job graph, its running state, current data freshness, system latency, CPU/memory usage per transform, current number of running workers, etc.
- Support for creating and generating custom alerts in case of any malfunction of any stage in the ETL job or the cluster where the ETL job is deployed.
Writing and running ETL jobs in Spark or Flint clusters has been a popular choice among data engineers, but as mentioned previously, they all come with the additional operational overhead of continuously managing these clusters. Based on the above design considerations, our preferred choice is Dataflow.
Dataflow is Google’s fully managed cloud service for running ETL jobs. Pipeline code, which runs on Dataflow, is built using one of the Apache Beam SDKs (Java, Python, Go, Scala). Apache Beam is an open source, unified programming model for defining the ETL pipeline in any of the supported languages of our choice. One important note here is that, even though we are running Apache Beam jobs on Dataflow, the same code (perhaps with some minor tweaks) can be deployed on other supported distributed data processing engines, aka “Beam pipeline runners” such as Spark, Flink, Samza, etc. Basically, you write code once and then deploy it on any of the supported runners.
A data warehouse is a set of data stores for saving processed/enriched data emitted by ETL jobs. For any enterprise network, the selection of a data store is based on the use case that the particular set of data is trying to solve. Based on our analysis, at least the following three types of data stores need to be set up for an enterprise network.
A. Data store for dashboard
A dashboard that provides real-time network visibility is very important for any network admin or Ops team. There is an ever-increasing demand for creating rich GUIs full of data insights with various bells and whistles, such as text-based filtering, date-based filtering, partial or full-text search, and representation of data via different charts, viz Sankey, bar, pie, candlestick, etc.
• Able to handle huge amounts of data ingestion and support multiple simultaneous read and write operations.
• Easily scalable without much downtime.
• Capability to perform fast searches (free-flow text search; i.e., both partial and full-text search), since search is one of the most frequently used operations on any data dashboard.
• Some analytics like aggregations with buckets, filtering, etc., to produce time series data.
Elasticsearch is a great tool to perform the four tasks listed above very well.
Not only can it support a huge amount of data ingestion (hundreds of gigabytes of data), but it is also optimized for fast search operations across large data sets. Various kinds of concurrent searches, fuzzy queries, auto-completion of search terms, etc., can all be done with very low latency since Elasticsearch is a search engine based on the Lucene library. Another advantage of using Elasticsearch is its capability to do aggregations for producing time series data for admin dashboards.
Apache Pinot is another great fit for this type of data store. Unlike Elasticsearch, it is not built for search; rather, it is built for performing OLAP (OnLine Analytical Processing) queries with low latency, making it a very good fit for real-time analytics. It uses the Star-Tree index, which is like a pre-aggregated value store of all combinations of all dimensions of the data. Apache Pinot uses these pre-aggregated values to provide very low-latency, real-time analytics on the data.
Note that SQL-based RDBMS systems like MySQL/PostgreSQL or NoSQL databases like MongoDB might also be an option here and both are quite good when it comes to handling scale. However, when it comes to creating dashboards providing user-level analytics, time series charts, and n-gram types of searches across millions of records, we recommend Elasticsearch or Apache Pinot types of systems.
B. Data store for ML engines
Telemetry data emitted in enterprise networks can result in hundreds of gigabytes or petabytes of data. Selecting an appropriate data store for storing such a huge volume of data accompanied with fast querying support is one of the most important requirements for ML engines. We would prefer using serverless solutions here so that the entire focus can be on getting rich and meaningful insights out of data using known query languages like SQL.
• No-ops, serverless solution to store and process hundreds of gigagbytes and petabytes of data at a relatively lower cost.
• Support for fast querying of large data sets, preferably using SQL-like queries since SQL is the most preferred and commonly used language among data scientists and data engineers.
• Rich API client libraries in various languages, such as Java, Python, Go, etc.
• Support for injecting streaming data into tables via ETL jobs.
• Built-in support for automatic data replication for disaster recovery and high availability.
• Support for clustering and partitioning of tables, not only to help save on costs of storing old data, but also to optimize certain kinds of SQL queries.
BigQuery is a great choice to use as a data store if you want to keep your data in GCP and your ML engine also runs in GCP. Features like BigQuery ML helps data scientists to build and execute ML models directly inside BigQuery using SQL in no time. Additionally, its analytics functions are a plus.
A great alternative to BigQuery is AWS Athena if you want to run your ML engine in the AWS cloud platform and your data set resides in Amazon S3. Just like BigQuery, AWS Athena is also a fully managed serverless solution that supports fast SQL querying of large data sets.
C. Data store for data backup
Apart from having data stores for dashboards and ML engines, one should always plan to maintain an additional data store for saving relevant data (both raw and enriched) for different needs such as data backup, on-demand debugging, cold storage for keeping historical data, etc. It need not be a fancy data store with bells and whistles like Elasticsearch, BigQuery, or Apache Pinot, but it can be a relatively low-cost option that can save data with 99 to 100 percent annual durability in various formats of our choice.
- Support for storing and retrieving huge or, in fact, any amounts of data.Multiregional support for saving data. Easy-to-specify region of our choice based on compliance or any other enterprise policy.
- Easy-to-configure access and security rules and policies.
- Support for optimizing cost with the help of storage classes; i.e., it should be possible to automatically move data to lower-cost storage (storage-class) by configuring a data storage bucket with Object Lifecycle.
- Support for various data formats such as Avro, Parquet, JSON, CSV, etc,
Both Amazon S3 and Google Cloud Storage are great options to use as a data store here, depending on the cloud of your choice and where your data resides.
5. Logging, monitoring, and debugging
Apart from saving processed data to multiple data stores, it is very crucial to monitor the performance of the ETL jobs or any other component of the analytics pipeline. To gain insights about the system, consider the following scenarios:
- Any error or abnormal behavior is noticed while processing data from AXI edges.
- Whenever system latency or data freshness falls below a certain threshold.
- Ensuring that the system remains healthy.
- Maintaining the performance of the system if the volume of work increases.
Use third-party libraries such as Datadog to export metrics about the system and the pipeline. Considering time and effort, we decided to move on with the cloud logger tool for debugging and monitoring the system.
Logs are the data points that refer to any events that have occurred in our system. By default, the logs generated by our application code in the ETL jobs and the underlying core system logs are ingested into the cloud logger with the timestamp and the severity of the event. In the cloud logger tool, these logs are monitored and system-level/user-defined metrics are generated to create alerts based on the system alerting policy.
If the size of the data set streaming into the system grows, it may lead to a decrease in performance followed by component failure. It affects data freshness, system CPU, memory, and latency of the data being processed or waiting inside any one pipeline source.
In cloud logger tools, metrics are the only kind of data that can be used to create alerts. Metrics are created based on the specific labels in the log message and its severity. Alerting policies are defined using the above metrics, and alerts are created either in Slack or any incident management system such as Opsgenie, PagerDuty, etc. Alerts are triggered if the metric value exceeds the specified threshold.
The incident manager acts as a dispatcher for alerts, determines the right people in the Ops team to notify based on on-call schedules, and escalates alerts until the alert is acknowledged or closed.
About the authors
Mohil Khare joined Prosimo at an early stage and led the design and development of the analytics pipeline. He is interested in Beam, analytics, machine learning, and Java, among other things. Before joining Prosimo, Mohil worked on multiple projects at Viptela, where he was a Senior Technical Leader responsible for multi-tenant cloud infrastructure. He has an M.S. in Computer Science from Georgia Tech.
A seasoned, creative software engineer, Senthil Nathan Chockalingam has been in the software industry field for over seven years, delivering new and innovative solutions for Prosimo’s massive information technology and programming needs. He completed his Master’s degree in Computer Science at the University of Texas at Dallas. Upon graduating, he worked at Viptela and gained experience in setting up Ops infrastructure. His current responsibilities at Prosimo include setting up the Ops platform and building data pipelines (ETL jobs) to build aggregations and analysis on the data set.