Page cover image

DAG, Infra & Python

This section is for technical users who want advanced configuration of Patterns Analysts

Analyst Files

An Analyst consists of a bundle of code and configuration that defines an Analyst. Like a Terraform project, an Analysts Files can be thought of as “data infrastructure as code” — fully defined by git-controllable files. Analyst Files can be developed manually in the DAG/IDE interface, locally using your own dev tools and the local devkit, or generated by the Analyst itself.

Executions operations leverage a proprietary library, Patterns, that defines methods for building Nodes connected in a directed graph that definee the flow of data. The basic node types combine to build powerful end-to-end data workflows -- Python, SQL, and Webhook nodes for computation and IO, Table stores for storage, and Chart and Markdown nodes for presentation.

Reactive Node Graph

The Graph is the execution and orchestration engine that powers all data flow for an app. When you write function nodes (Python, SQL), you define their storage inputs/outputs (Tables), as in the example below where we generate data in python and then operate on it in sql.

Python: Generate data

import random
from patterns import Table


# An output (writeable) Table
table = Table("random_data", "w")

for i in range(100):
    table.append({"random": random.randint(0, 100), "index": i})

SQL: Aggregate

select
    index % 10 as bucket
  , avg(random) as average
from {{ Table("random_data") }} as t
group by 1

Through this process of defining where to read/write data, Patterns automatically infers node relationships and will manage execution dependencies between the nodes.

Patterns is reactive, meaning nodes update automatically when upstream nodes generate new data, so your App is always up-to-date. For root nodes, or expensive operations, you can also set an explicit schedule to manually control when the node runs.

Node types

Python

Python nodes execute a python script which optionally interacts with any number of Table store inputs and outputs. It executes and runs on dedicated and isolated compute containers and has access to standard python data libraries. Here’s a simple python node that augments data from one Table to another:

from datetime.datetime import now
from patterns import Table

leads = Table("new_leads")
enriched = Table("enriched_leads", mode="w")

leads_df = leads.read_dataframe()

# Do other (more interesting things) here optionally
leads_df["processed_at"] = now()

# Reset the enriched table to create a new version
enriched.reset()
# Write the dataframe to that version
enriched.append(leads_df)

SQL

SQL nodes interact with Table stores and execute against the respective database of the stores. They utilize {{ }} templating to specify tables and parameters safely. By default, a select statement will result in the query creating a new table version on the connected output Table. Here’s a simple SQL node:

select
    customer_id
  , sum(amount) as total_sales
from {{ Table("transactions") }} as t
group by 1

Table

Table nodes are storage nodes that are similar to standard database tables, but provide a layer of abstraction that enables data versioning, retention, and cross-platform support. They are attached to a specific database and create actual database tables based on the operations of Python and SQL nodes. These underlying tables share the Table name but are suffixed with a (monotonic in time) hash for each version.

Stream

Streams are stateful views of Tables for processing a stream of records, in a one-at-a-time fashion, and enable real-time processing of records with Python nodes. They are created with table.as_stream(), which by default streams the records in order they were written, using the patterns_id field that is automatically added to all records written from python or webhook nodes. Alternatively, you can explicitly specify the order with table.as_stream(order_by="my_field"). Streams persist consumption progress across executions so records are processed once even if an execution is interrupted or restarted. Note that if the ordering field isn't strictly monotonic then there's no guarantee that the records will be processed once and only once if the processing is spread over multiple executions.

Python: message stream to table

import random
from datetime.datetime import now
from patterns import Table

# An input (readable) Table
messages = Table("messages")

# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")

# Use a stream to process each record incrementally just once
stream = messages.as_stream()

for record in stream.consume_records():
    record["processed_at"] = now()
    record["random"] = random.randint(0,100)
    messages_with_metadata.append(record)

Webhook

Webhook nodes provide an endpoint that you can POST JSON data to, which is stored in a Table. Webhooks are a convenient way to stream data into an app and process in real-time.

Chart

Chart nodes allow you to visualize data from a Table and display it directly in the graph. Charts are defined by a JSON file specifying a valid vega-lite chart. The easiest way to get started is to explore the library of example charts here: https://vega.github.io/vega-lite/examples/.

Markdown

Markdown nodes allow you to document and explain your graph in-place. They accept standard markdown syntax.

Secrets and Connections

Secrets give you a place to manage sensitive organization-wide parameters (e.g. API keys) and use them in components and nodes. Similarly, Connections provide an explicit OAuth connection to various vendors that can be used in components.

Schemas

All data that moves through Patterns is associated with a Schema , a description of the data’s fields and semantic details, much like a traditional database DDL “CREATE TABLE” statement, but made more general. Schemas enable components and nodes to interoperate safely, since each know the structure of the data to expect. See the Common Model project for more details on the Schema specification. It’s always optional to declare explicit schemas on your stores — Patterns knows how to automatically infer them from existing data, but is encouraged for re-usable component development.

Local development and Version Control

Patterns ships with a command line devkit that enables you to pull and push your Agent Files to the Patterns platform and work locally on the node and graph files in your own development tools. This is also the recommended way to provide robust version control for complex and production Agents — pulling files locally and using git to version control.

Data Retention and Versioning

By default, Patterns retains copies of old versions of Table stores for up to 7 days. Upgraded accounts can configure the retention period.

Marketplace

The Patterns Marketplace is a repository of pre-built components and apps that you can use and fork in your own projects. The modular nature of the Patterns platform means that often your use case has already been partially or fully solved and exists in the marketplace, so there’s no need to re-invent the wheel.

Last updated