9  Robust Pipelines with Monads

In the previous chapters, you learned functional programming fundamentals and how to build reproducible pipelines with {rixpress}. Now we’ll add another layer of robustness: monads.

Monads might sound abstract, but they solve concrete problems involving Logging (tracing what each step does without cluttering your functions), Error handling (letting failures propagate gracefully instead of crashing), and Composition (keeping functions composable even when they need to do “extra” work).

By the end of this chapter, you’ll know how to integrate {chronicler} and cronista into your pipelines for more robust, observable data workflows.

9.1 Why Do We Need Logging?

When you build data pipelines, things inevitably go wrong. A dataset might have unexpected missing values. A computation might produce NaN. A join might drop more rows than expected. When these issues occur, the first question is always: where did it happen?

Consider a simple pipeline:

result <- raw_data |>
  clean_data() |>
  filter_outliers() |>
  compute_stats() |>
  generate_report()

If result is wrong or the pipeline crashes, you need to figure out which step failed. A way to do this is to add print statements:

result <- raw_data |>
  clean_data() |>
  { print("clean_data done"); . }() |>
  filter_outliers() |>
  { print("filter_outliers done"); . }() |>
  compute_stats() |>
  { print("compute_stats done"); . }() |>
  generate_report()

This works, but it’s ugly and mixes logging logic with business logic. You have to remember to remove these print statements before sharing your code. Worse, if you want to capture what went into each step or how long it took, you end up with even more boilerplate.

NoteNix Error Messages

When using {rixpress}, Nix does report errors during builds, but the messages can be hard to read—especially if the error happens deep in your pipeline or involves R/Python stack traces mixed with Nix output. Having explicit logging in your functions makes debugging much easier because you get clear, structured information about what happened at each step.

What if your functions could automatically log themselves without any changes to their core logic?

9.2 The Problem: Decorated Functions Don’t Compose

Suppose you want your functions to provide logs. You might rewrite sqrt() like this:

my_sqrt <- function(x, log = "") {
  list(
    result = sqrt(x),
    log = c(log, paste0("Running sqrt with input ", x))
  )
}

my_log <- function(x, log = "") {
  list(
    result = log(x),
    log = c(log, paste0("Running log with input ", x))
  )
}

These functions now return lists with both the result and a log. But there’s a problem: they don’t compose:

# This works:
10 |> sqrt() |> log()

# This fails:
10 |> my_sqrt() |> my_log()
# Error: non-numeric argument to mathematical function

my_log() expects a number, but my_sqrt() returns a list. We’ve broken composition.

9.3 The Solution: Function Factories and Bind

A monad provides two things:

  1. A function factory that decorates functions so they can provide additional output without rewriting their core logic
  2. A bind() function that makes these decorated functions compose

Here’s a simple function factory for logging:

log_it <- function(.f) {
  fstring <- deparse(substitute(.f))
  
  function(..., .log = NULL) {
    list(
      result = .f(...),
      log = c(.log, paste0("Running ", fstring, " with argument ", ...))
    )
  }
}

# Create decorated functions
l_sqrt <- log_it(sqrt)
l_log <- log_it(log)

l_sqrt(10)
#> $result
#> [1] 3.162278
#> $log
#> [1] "Running sqrt with argument 10"

Now we need bind() to make them compose:

bind <- function(.l, .f) {
  .f(.l$result, .log = .l$log)
}

# Now they compose!
10 |>
  l_sqrt() |>
  bind(l_log)

#> $result
#> [1] 1.151293
#> $log
#> [1] "Running sqrt with argument 10"
#> [2] "Running log with argument 3.16227766016838"

This pattern of a function factory plus bind() is the essence of a monad.

9.4 A Deeper Look: The Formal Definition

While our function factory approach is practical for R programmers, it’s useful to understand how this relates to the formal definition from Haskell, where monads originate:

class Monad m where
  (>>=)  :: m a -> (a -> m b) -> m b  -- This is bind()
  (>>)   :: m a ->  m b       -> m b  -- We won't need this
  return ::   a               -> m a  -- This wraps a value

What this means is that a Monad has three operations:

  • >>= (called “bind”): exactly what we implemented as our bind() function
  • >> (called “then”): not essential for understanding, so we’ll skip it
  • return: a function that wraps a plain value into a monadic value

Don’t be confused by the name return—this has nothing to do with R’s return() for exiting functions. In monad terminology, return (also called unit) takes a value a and wraps it into a monadic value m a.

While we didn’t explicitly implement return/unit, our function factory accomplishes the same goal. We can easily derive unit from log_it():

unit <- log_it(identity)

unit(10)
#> $result
#> [1] 10
#> $log
#> [1] "Running identity with argument 10"

So return/unit is just the identity() function passed through our factory. In a sense, the function factory is even more fundamental than return/unit.

9.5 Alternative Perspective: fmap, flatten, and flatmap

You may sometimes read that monads are objects with a flatmap() method. This is another valid perspective. To understand it, we need two helper functions: fmap() and flatten().

fmap() applies an undecorated function to a monadic value:

fmap <- function(m, f, ...) {
  fstring <- deparse(substitute(f))
  
  list(
    result = f(m$result, ...),
    log = c(m$log, paste0("fmapping ", fstring, " with argument ", m$result))
  )
}

# Create a monadic value using unit()
m <- unit(10)

# Apply an undecorated function
fmap(m, log)
#> $result
#> [1] 2.302585
#> $log
#> [1] "Running identity with argument 10"
#> [2] "fmapping log with argument 10"

But what happens if we use fmap() with a decorated function?

fmap(m, l_log)
#> $result
#> $result$result
#> [1] 2.302585
#> $result$log
#> [1] "Running log with argument 10"
#>
#> $log
#> [1] "Running identity with argument 10"
#> [2] "fmapping l_log with argument 10"

We get a nested structure—a monadic value containing another monadic value. This is where flatten() (sometimes called join()) comes in:

flatten <- function(m) {
  list(
    result = m$result$result,
    log = c(m$log, m$result$log)
  )
}

flatten(fmap(m, l_log))
#> $result
#> [1] 2.302585
#> $log
#> [1] "Running identity with argument 10"
#> [2] "fmapping l_log with argument 10"
#> [3] "Running log with argument 10"

Now here’s the key insight: flatmap is the composition of flatten and fmap:

# Define a composition operator
`%.%` <- \(f, g)(function(...)(f(g(...))))

# Compose flatten and fmap
# "flatten after fmap"
flatmap <- flatten %.% fmap

# Now we can use flatmap instead of bind:
10 |>
  l_sqrt() |>
  flatmap(l_log)

This gives the same result as using bind(). I prefer introducing monads via bind() because it comes naturally as a solution to the composition problem. But in some contexts, thinking in terms of fmap() and flatten() is more intuitive, so it’s good to know both approaches.

9.6 Lists Are Monads Too

Here’s a surprising fact: lists are monads. We already have all the necessary ingredients in base R and {purrr}:

  • as.list() is return/unit
  • purrr::map() is fmap()
  • purrr::flatten() is flatten()

This means we can derive flatmap for lists:

# Compose flatten and map to get flatmap for lists
flatmap_list <- purrr::compose(purrr::flatten, purrr::map)

# Functions that return lists don't compose naturally...
list_sqrt <- \(x)(as.list(sqrt(x)))
list_log <- \(x)(as.list(log(x)))

# But with flatmap_list, they do!
10 |>
  list_sqrt() |>
  flatmap_list(list_log)
#> [[1]]
#> [1] 1.151293

This example illustrates that monads are not some exotic concept—they’re a pattern you’ve likely used without knowing it. Every time you use purrr to chain operations over lists, you’re doing monadic programming.

9.7 The chronicler Package

The {chronicler} package implements this pattern properly for R. It provides:

  • record(): A function factory that decorates functions
  • bind_record(): The bind operation
  • Automatic logging of all operations
library(chronicler)

# Decorate functions
r_sqrt <- record(sqrt)
r_exp <- record(exp)
r_mean <- record(mean)

# Compose them
result <- 1:10 |>
  r_sqrt() |>
  bind_record(r_exp) |>
  bind_record(r_mean)

# View the result
result$value
#> [1] 5.187899

# View the log
read_log(result)
#> [1] "Complete log:"
#> [2] "✔ sqrt ran successfully"
#> [3] "✔ exp ran successfully"
#> [4] "✔ mean ran successfully"
TipGetting LLM assistance with {chronicler}

If the functional programming patterns in {chronicler} feel unfamiliar, remember that you can use pkgctx to generate LLM-ready context. The {chronicler} repository includes a .pkgctx.yaml file you can feed to your LLM. You can also generate your own:

nix run github:b-rodrigues/pkgctx -- r github:b-rodrigues/chronicler > chronicler.pkgctx.yaml

With this context, your LLM can help you refactor your existing functions to be monadic using record() and bind_record().

9.8 The Maybe Monad: Handling Errors

Another common monad is Maybe, which handles computations that might fail. Instead of crashing, functions return either:

  • Just(value) if successful
  • Nothing if something went wrong

The {chronicler} package uses this under the hood:

r_sqrt <- record(sqrt)

# This works
r_sqrt(16)
#> ✔ Value: Just
#> [1] 4

# This fails gracefully
r_sqrt("not a number")
#> ✖ Value: Nothing

When Nothing is passed to a decorated function, it immediately returns Nothing, the error propagates through the pipeline without crashing.

9.9 Monads in Python: cronista

The same concepts exist in Python. The cronista package (Python equivalent of {chronicler}) provides the same record() decorator and logging capabilities:

from cronista import record, read_log, unveil
import math

# Decorate functions
r_sqrt = record(math.sqrt)
r_exp = record(math.exp)

@record()
def compute_mean(values):
    return sum(values) / len(values)

# Compose them
result = (
    r_sqrt(16)
      .bind_record(r_exp)
)

# View the result
print(unveil(result, "value"))  # 54.59815...

# View the log
print(read_log(result, style="pretty"))
#> OK `sqrt` at 2025-01-31 12:00:00 (0.001s)
#> OK `exp` at 2025-01-31 12:00:00 (0.001s)
#> Total: 0.002 secs

When a computation fails, cronista captures the error as Nothing:

@record()
def divide_by_zero():
    return 1 / 0

result = divide_by_zero()
print(result.is_ok())  # False (this is a Nothing)
print(read_log(result, style="errors-only"))
#> NOK `divide_by_zero` — ZeroDivisionError: division by zero
TipGetting LLM assistance with cronista

Just like with {chronicler}, you can use pkgctx to help your LLM understand cronista:

nix run github:b-rodrigues/pkgctx -- python github:b-rodrigues/cronista > cronista.pkgctx.yaml

This is particularly useful for learning how to chain monadic operations in Python correctly.

The key operations mirror chronicler:

  • record(fn): Decorate a function to return a Chronicle
  • .bind_record(rfn): Chain with another recorded function
  • read_log(c): View the operation log

9.10 Building Robust Pipelines

The real power of monads becomes apparent when you combine them with pipeline orchestration. Consider a typical data pipeline:

# Standard pipeline - fragile
raw_data |>
  basic_cleaning() |>
  recodings() |>
  filter_arrivals() |>
  make_monthly() |>
  make_plot()

If recodings() fails halfway through, the entire pipeline crashes. You get an error message, but no information about what succeeded before the failure.

Now imagine wrapping each function with record():

library(chronicler)

# Robust pipeline with logging
r_basic_cleaning <- record(basic_cleaning)
r_recodings <- record(recodings)
r_filter_arrivals <- record(filter_arrivals)
r_make_monthly <- record(make_monthly)
r_make_plot <- record(make_plot)

result <- raw_data |>
  r_basic_cleaning() |>
  bind_record(r_recodings) |>
  bind_record(r_filter_arrivals) |>
  bind_record(r_make_monthly) |>
  bind_record(r_make_plot)

# Now you get:
# - The result (or Nothing if any step failed)
# - A complete log of which steps ran
# - Exactly where and why it failed
read_log(result)

This pattern transforms a fragile script into a robust, observable pipeline.

9.11 Integrating with rixpress

You can combine the power of {chronicler} with {rixpress} for even more robust pipelines. The key insight is that your user_functions can use record() internally:

# functions.R
library(chronicler)

# Create recorded versions of your functions
r_basic_cleaning <- record(function(data) {
  data |>
    select(contains("TIME"), contains("20")) |>
    pivot_longer(cols = contains("20"),
                 names_to = "date",
                 values_to = "passengers")
})

r_recodings <- record(function(data) {
  data |>
    mutate(tra_meas = fct_recode(tra_meas, ...)) |>
    mutate(passengers = as.numeric(passengers))
})

# Export a pipeline function that uses bind_record
process_aviation_data <- function(raw_data) {
  raw_data |>
    r_basic_cleaning() |>
    bind_record(r_recodings)
}

Then in your {rixpress} pipeline:

library(rixpress)

list(
  rxp_r_file(
    name = avia_raw,
    path = "data/avia.tsv",
    read_function = readr::read_tsv
  ),
  
  rxp_r(
    name = processed_data,
    expr = process_aviation_data(avia_raw),
    user_functions = "functions.R"
  ),
  
  # ... more steps
) |>
  rxp_populate()

The {rixpress} derivation caches the result while {chronicler} provides logging and error handling within the step.

9.12 Python Integration with cronista

The same pattern works in Python pipelines using cronista:

# functions.py
from cronista import record

# Decorate your functions
r_basic_cleaning = record(lambda df: df.dropna().reset_index(drop=True))
r_recodings = record(lambda df: df.assign(
    category=df["category"].map(category_mapping)
))
r_filter_arrivals = record(lambda df: df[df["tra_meas"] == "Arrivals"])

def process_data(raw_df):
    """Chain operations with bind_record for logging and error handling."""
    return (
        r_basic_cleaning(raw_df)
          .bind_record(r_recodings)
          .bind_record(r_filter_arrivals)
    )

Then in your {rixpress} pipeline:

rxp_py(
  name = processed_data,
  expr = "process_data(raw_data)",
  user_functions = "functions.py"
)

9.13 The Monadic Laws

For the mathematically curious, monads must satisfy three laws. While you don’t need to know these to use monads effectively, understanding them helps you verify that an implementation is correct.

9.13.1 First Law: Left Identity

Passing a value to return/unit and then binding it to a function f is the same as applying f directly to the value:

library(chronicler)

a <- as_chronicle(10)  # as_chronicle is chronicler's "return/unit"
r_sqrt <- record(sqrt)

# These should give the same result:
bind_record(a, r_sqrt)$value
#> [1] 3.162278

r_sqrt(10)$value
#> [1] 3.162278

9.13.2 Second Law: Right Identity

Binding a monadic value to return/unit returns the original value unchanged:

a <- as_chronicle(10)

# Binding to as_chronicle should return the same value:
bind_record(a, as_chronicle)$value
#> [1] 10

a$value
#> [1] 10

9.13.3 Third Law: Associativity

The order of binding operations doesn’t matter—applying functions successively or composing them first gives the same result:

a <- as_chronicle(10)

r_sqrt <- record(sqrt)
r_exp <- record(exp)

# Method 1: Chain binds sequentially
result1 <- bind_record(a, r_sqrt) |>
  bind_record(r_exp)

# Method 2: Compose binds, then apply
composed_fn <- \(x) bind_record(x, r_sqrt) |> bind_record(r_exp)
result2 <- composed_fn(a)

# These should be equal:
result1$value
#> [1] 6.494743

result2$value
#> [1] 6.494743
NoteA Note on Logs

In {chronicler}, the logs between these equivalent expressions may differ slightly (since they record different paths through the code), but the values will always be identical. This is the important property for computational correctness.

These laws ensure that monadic code behaves predictably—you can refactor bind chains without changing semantics, and you can trust that composition works as expected.

9.14 Detecting Silent Failures in Pipelines

Here’s the catch: when you use chronicler or cronista functions in a rixpress pipeline, Nix builds never fail. Even when a computation produces Nothing, it’s still a valid object that gets serialized successfully.

Consider this pipeline:

library(rixpress)

list(
  rxp_r(
    name = result,
    expr = r_sqrt(-1),  # This produces Nothing, not an error!
    user_functions = "functions.R"
  )
) |> rxp_populate(build = FALSE)

When you run rxp_make(), the build succeeds! But result contains Nothing, meaning the computation actually failed. Without checking, you might think your pipeline worked perfectly.

9.14.1 Automatic Chronicle Checking

When {chronicler} is available, rxp_make() automatically checks pipeline outputs for Nothing values after a successful build:

rxp_make()
#> Build successful! Run `rxp_inspect()` for a summary.
#> ...
#> 
#> This pipeline uses {chronicler}. Here is a summary of chronicle results:
#> Chronicle status:
#> ✓ filtered_mtcars (chronicle: OK)
#> ✓ mtcars_mpg (chronicle: OK)
#> ✓ mean_mpg (chronicle: OK)
#> ✗ sqrt_of_negative (chronicle: NOTHING)
#>     Failed: sqrt
#>     Message: NaNs produced
#> ✗ downstream_of_nothing (chronicle: NOTHING)
#> 
#> Summary: 3 success, 0 with warnings, 2 nothing
#> Warning: 2 derivation(s) contain Nothing values!

You can also check manually at any time:

rxp_check_chronicles()

Chronicles can be in one of three states:

Symbol State Meaning
Success Just value, no warnings or errors
Warning Just value, but warnings were captured
Nothing Failed computation, errors captured

9.14.2 Python: cronista + ryxpress

The same pattern works in Python with cronista (the Python equivalent of chronicler) and ryxpress:

# gen-pipeline.R using Python functions
list(
  rxp_py(
    name = sqrt_result,
    expr = "r_sqrt(16)",            # Success: Just(4.0)
    user_functions = "functions.py"
  ),
  
  rxp_py(
    name = div_by_zero,
    expr = "divide_by_zero()",      # Failure: Nothing
    user_functions = "functions.py"
  )
) |> rxp_populate(build = FALSE)

Where functions.py uses cronista’s record():

# functions.py
import math
from cronista import record

r_sqrt = record(math.sqrt)

@record()
def divide_by_zero():
    return 1 / 0  # This error is captured, not raised

After building, use ryxpress to check for Nothing values:

from ryxpress import rxp_check_chronicles

rxp_check_chronicles()
#> Chronicle status:
#> ✓ sqrt_result (chronicle: OK)
#> ✗ div_by_zero (chronicle: NOTHING)
#>     Failed: divide_by_zero
#>     Message: ZeroDivisionError: division by zero

Reading a chronicle with Nothing automatically warns you:

from ryxpress import rxp_read

result = rxp_read("div_by_zero")
#> UserWarning: Derivation 'div_by_zero' contains a chronicle with Nothing value!
#> Use cronista.read_log() on this object for details.
TipGetting LLM assistance with cronista

Just like with the R packages, you can generate context for your LLM:

nix run github:b-rodrigues/pkgctx -- python github:b-rodrigues/cronista > cronista.pkgctx.yaml

This helps your LLM understand how to use record(), bind_record(), and read_log() in Python.

9.15 Summary

Monads add a layer of robustness to your pipelines. In this chapter, we covered:

Core Concepts:

  • A monad provides a function factory (to decorate functions) and a bind() function (to compose decorated functions)
  • The relationship between bind(), fmap(), flatten(), and flatmap()
  • The formal Haskell definition with >>=, >>, and return
  • Lists as a familiar example of monads (via purrr::map and purrr::flatten)
  • The three monadic laws that ensure predictable behaviour

Practical Tools:

  • chronicler (record(), bind_record()): Logging and error handling for R
  • cronista (record(), bind_record()): The Python equivalent with full logs
  • Just/Nothing: Graceful failure propagation without crashes
  • Integration with rixpress: Monads within pipeline steps for observability

Combined with {rixpress}:

  • Nix provides hermetic, cached execution
  • Monads provide logging and error handling
  • rxp_check_chronicles() detects silent failures in your pipeline
  • Together they give you robust, reproducible, observable pipelines

This completes our code quality toolkit: environments (Nix), functional code (FP), testing, pipelines (rixpress), and robustness (monads). In the next chapter, we’ll learn how to distribute our work with Docker, followed by packaging and continuous integration.