8  Robust Pipelines with Monads

In the previous chapters, you learned functional programming fundamentals and how to build reproducible pipelines with {rixpress}. Now we’ll add another layer of robustness: monads.

Monads might sound abstract, but they solve concrete problems involving Logging (tracing what each step does without cluttering your functions), Error handling (letting failures propagate gracefully instead of crashing), and Composition (keeping functions composable even when they need to do “extra” work).

By the end of this chapter, you’ll know how to integrate {chronicler} and talvez into your pipelines for more robust, observable data workflows.

8.1 The Problem: Decorated Functions Don’t Compose

Suppose you want your functions to provide logs. You might rewrite sqrt() like this:

my_sqrt <- function(x, log = "") {
  list(
    result = sqrt(x),
    log = c(log, paste0("Running sqrt with input ", x))
  )
}

my_log <- function(x, log = "") {
  list(
    result = log(x),
    log = c(log, paste0("Running log with input ", x))
  )
}

These functions now return lists with both the result and a log. But there’s a problem: they don’t compose:

# This works:
10 |> sqrt() |> log()

# This fails:
10 |> my_sqrt() |> my_log()
# Error: non-numeric argument to mathematical function

my_log() expects a number, but my_sqrt() returns a list. We’ve broken composition.

8.2 The Solution: Function Factories and Bind

A monad provides two things:

  1. A function factory that decorates functions so they can provide additional output without rewriting their core logic
  2. A bind() function that makes these decorated functions compose

Here’s a simple function factory for logging:

log_it <- function(.f) {
  fstring <- deparse(substitute(.f))
  
  function(..., .log = NULL) {
    list(
      result = .f(...),
      log = c(.log, paste0("Running ", fstring, " with argument ", ...))
    )
  }
}

# Create decorated functions
l_sqrt <- log_it(sqrt)
l_log <- log_it(log)

l_sqrt(10)
#> $result
#> [1] 3.162278
#> $log
#> [1] "Running sqrt with argument 10"

Now we need bind() to make them compose:

bind <- function(.l, .f) {
  .f(.l$result, .log = .l$log)
}

# Now they compose!
10 |>
  l_sqrt() |>
  bind(l_log)

#> $result
#> [1] 1.151293
#> $log
#> [1] "Running sqrt with argument 10"
#> [2] "Running log with argument 3.16227766016838"

This pattern of a function factory plus bind() is the essence of a monad.

8.3 The chronicler Package

The {chronicler} package implements this pattern properly for R. It provides:

  • record(): A function factory that decorates functions
  • bind_record(): The bind operation
  • Automatic logging of all operations
library(chronicler)

# Decorate functions
r_sqrt <- record(sqrt)
r_exp <- record(exp)
r_mean <- record(mean)

# Compose them
result <- 1:10 |>
  r_sqrt() |>
  bind_record(r_exp) |>
  bind_record(r_mean)

# View the result
result$value
#> [1] 5.187899

# View the log
read_log(result)
#> [1] "Complete log:"
#> [2] "✔ sqrt ran successfully"
#> [3] "✔ exp ran successfully"
#> [4] "✔ mean ran successfully"
Getting LLM assistance with {chronicler}

If the functional programming patterns in {chronicler} feel unfamiliar, remember that you can use pkgctx to generate LLM-ready context. The {chronicler} repository includes a .pkgctx.yaml file you can feed to your LLM. You can also generate your own:

nix run github:b-rodrigues/pkgctx -- r github:b-rodrigues/chronicler > chronicler.pkgctx.yaml

With this context, your LLM can help you refactor your existing functions to be monadic using record() and bind_record().

8.4 The Maybe Monad: Handling Errors

Another common monad is Maybe, which handles computations that might fail. Instead of crashing, functions return either:

  • Just(value) if successful
  • Nothing if something went wrong

The {chronicler} package uses this under the hood:

r_sqrt <- record(sqrt)

# This works
r_sqrt(16)
#> ✔ Value: Just
#> [1] 4

# This fails gracefully
r_sqrt("not a number")
#> ✖ Value: Nothing

When Nothing is passed to a decorated function, it immediately returns Nothing, the error propagates through the pipeline without crashing.

8.5 Monads in Python: talvez

The same concepts exist in Python. The talvez package provides a Maybe monad:

from talvez import maybe, just

@maybe()
def parse_int(s: str) -> int:
    return int(s)

@maybe(ensure=lambda x: x != 0)
def reciprocal(n: int) -> float:
    return 1 / n

# Successful computation
result = (
    parse_int("25")
      .bind(reciprocal)
      .fmap(lambda x: x * 100)
)
print(result)            # Just(4.0)
print(result.get_or(-1)) # 4.0

# Failed computation
bad = (
    parse_int("not a number")
      .bind(reciprocal)
      .fmap(lambda x: x * 100)
)
print(bad)  # Nothing
Getting LLM assistance with talvez

Just like with {chronicler}, you can use pkgctx to help your LLM understand talvez. The repository includes a .pkgctx.yaml file, or you can generate one:

nix run github:b-rodrigues/pkgctx -- python github:b-rodrigues/talvez > talvez.pkgctx.yaml

This is particularly useful for learning how to chain monadic operations in Python correctly.

The key operations are:

  • fmap(fn): Apply a pure function to the value inside the monad
  • bind(fn): Apply a function that itself returns a monad

8.6 Building Robust Pipelines

The real power of monads becomes apparent when you combine them with pipeline orchestration. Consider a typical data pipeline:

# Standard pipeline - fragile
raw_data |>
  basic_cleaning() |>
  recodings() |>
  filter_arrivals() |>
  make_monthly() |>
  make_plot()

If recodings() fails halfway through, the entire pipeline crashes. You get an error message, but no information about what succeeded before the failure.

Now imagine wrapping each function with record():

library(chronicler)

# Robust pipeline with logging
r_basic_cleaning <- record(basic_cleaning)
r_recodings <- record(recodings)
r_filter_arrivals <- record(filter_arrivals)
r_make_monthly <- record(make_monthly)
r_make_plot <- record(make_plot)

result <- raw_data |>
  r_basic_cleaning() |>
  bind_record(r_recodings) |>
  bind_record(r_filter_arrivals) |>
  bind_record(r_make_monthly) |>
  bind_record(r_make_plot)

# Now you get:
# - The result (or Nothing if any step failed)
# - A complete log of which steps ran
# - Exactly where and why it failed
read_log(result)

This pattern transforms a fragile script into a robust, observable pipeline.

8.7 Integrating with rixpress

You can combine the power of {chronicler} with {rixpress} for even more robust pipelines. The key insight is that your user_functions can use record() internally:

# functions.R
library(chronicler)

# Create recorded versions of your functions
r_basic_cleaning <- record(function(data) {
  data |>
    select(contains("TIME"), contains("20")) |>
    pivot_longer(cols = contains("20"),
                 names_to = "date",
                 values_to = "passengers")
})

r_recodings <- record(function(data) {
  data |>
    mutate(tra_meas = fct_recode(tra_meas, ...)) |>
    mutate(passengers = as.numeric(passengers))
})

# Export a pipeline function that uses bind_record
process_aviation_data <- function(raw_data) {
  raw_data |>
    r_basic_cleaning() |>
    bind_record(r_recodings)
}

Then in your {rixpress} pipeline:

library(rixpress)

list(
  rxp_r_file(
    name = avia_raw,
    path = "data/avia.tsv",
    read_function = readr::read_tsv
  ),
  
  rxp_r(
    name = processed_data,
    expr = process_aviation_data(avia_raw),
    user_functions = "functions.R"
  ),
  
  # ... more steps
) |>
  rxp_populate()

The {rixpress} derivation caches the result while {chronicler} provides logging and error handling within the step.

8.8 Python Integration with talvez

The same pattern works in Python pipelines:

# functions.py
from talvez import maybe, chain, just

@maybe()
def basic_cleaning(df):
    return df.dropna().reset_index(drop=True)

@maybe()
def recodings(df):
    df['category'] = df['category'].map(category_mapping)
    return df

@maybe()
def filter_arrivals(df):
    return df[df['tra_meas'] == 'Arrivals']

def process_data(raw_df):
    """Returns Just(result) or Nothing with error handling."""
    return chain(
        just(raw_df),
        basic_cleaning,
        recodings,
        filter_arrivals
    )

Then in your {rixpress} pipeline:

rxp_py(
  name = processed_data,
  expr = "process_data(raw_data)",
  user_functions = "functions.py"
)

8.9 Summary

Monads add a layer of robustness to your pipelines:

  • chronicler (record(), bind_record()): Logging and error handling for R
  • talvez (@maybe(), bind(), chain()): Error handling for Python
  • Just/Nothing: Graceful failure propagation without crashes
  • Composability preserved: Even with extra capabilities, functions still compose

Combined with {rixpress}:

  • Nix provides hermetic, cached execution
  • Monads provide logging and error handling
  • Together they give you robust, reproducible, observable pipelines

This completes our toolkit for reproducible data science: environments (Nix), functional code (FP), pipelines (rixpress), distribution (Docker), and robustness (monads).