BackendTask.Stream

A Stream represents a flow of data through a pipeline.

It is typically

  • An input source, or Readable Stream (Stream { read : (), write : Never })
  • An output destination, or Writable Stream (Stream { read : Never, write : () })
  • And (optionally) a series of transformations in between, or Duplex Streams (Stream { read : (), write : () })

For example, you could have a stream that

For example,

import BackendTask.Stream as Stream exposing (Stream)

example =
    Stream.fileRead "data.txt"
        |> Stream.unzip
        |> Stream.command "wc" [ "-l" ]
        |> Stream.httpWithInput
            { url = "http://example.com"
            , method = "POST"
            , headers = []
            , retries = Nothing
            , timeoutInMs = Nothing
            }
        |> Stream.run

End example

type Stream error metadata kind

Once you've defined a Stream, it can be turned into a BackendTask that will run it (and optionally read its output and metadata).

errorTo
metaTo
{ read : toReadable
, write : ()
}
-> Stream
errorFrom
metaFrom
{ read : ()
, write : fromWriteable
}
-> Stream
errorTo
metaTo
{ read : toReadable
, write : fromWriteable
}

You can build up a pipeline of streams by using the pipe function.

The stream you are piping to must be writable ({ write : () }), and the stream you are piping from must be readable ({ read : () }).

module HelloWorld exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fromString "Hello, World!"
            |> Stream.stdout
            |> Stream.run
        )
String
-> Stream
()
()
{ read : ()
, write : Never
}

Open a file's contents as a Stream.

module ReadFile exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "elm.json"
            |> Stream.readJson (Decode.field "source-directories" (Decode.list Decode.string))
            |> BackendTask.allowFatal
            |> BackendTask.andThen
                (\{ body } ->
                    Script.log
                        ("The source directories are: "
                            ++ String.join ", " body
                        )
                )
        )

If you want to read a file but don't need to use any of the other Stream functions, you can use BackendTask.File.read instead.

String
-> Stream
()
()
{ read : Never
, write : ()
}

Write a Stream to a file.

module WriteFile exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "logs.txt"
            |> Stream.pipe (Stream.command "grep" [ "error" ])
            |> Stream.pipe (Stream.fileWrite "errors.txt")
        )
String
-> Stream
()
()
{ read : ()
, write : Never
}

A handy way to turn either a hardcoded String, or any other value from Elm into a Stream.

module HelloWorld exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fromString "Hello, World!"
            |> Stream.stdout
            |> Stream.run
            |> BackendTask.allowFatal
        )

A more programmatic use of fromString to use the result of a previous BackendTask to a Stream:

module HelloWorld exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Glob.fromString "src/**/*.elm"
            |> BackendTask.andThen
                (\elmFiles ->
                    elmFiles
                        |> String.join ", "
                        |> Stream.fromString
                        |> Stream.pipe Stream.stdout
                        |> Stream.run
                )
        )
{ url : String
, method : String
, headers : List ( String, String )
, body : Body
, retries : Maybe Int
, timeoutInMs : Maybe Int
}
-> Stream
Metadata
{ read : ()
, write : Never
}

Uses a regular HTTP request body (not a Stream). Streams the HTTP response body.

If you want to pass a stream as the request body, use httpWithInput instead.

If you don't need to stream the response body, you can use the functions from BackendTask.Http instead.

{ url : String
, method : String
, headers : List ( String, String )
, retries : Maybe Int
, timeoutInMs : Maybe Int
}
-> Stream
Metadata
{ read : ()
, write : ()
}

Streams the data from the input stream as the body of the HTTP request. The HTTP response body becomes the output stream.

()
()
{ read : ()
, write : Never
}

The stdin from the process. When you execute an elm-pages script, this will be the value that is piped in to it. For example, given this script module:

module CountLines exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.stdin
            |> Stream.read
            |> BackendTask.allowFatal
            |> BackendTask.andThen
                (\{ body } ->
                    body
                        |> String.lines
                        |> List.length
                        |> String.fromInt
                        |> Script.log
                )
        )

If you run the script without any stdin, it will wait until stdin is closed.

elm-pages run script/src/CountLines.elm
# pressing ctrl-d (or your platform-specific way of closing stdin) will print the number of lines in the input

Or you can pipe to it and it will read that input:

ls | elm-pages run script/src/CountLines.elm
# prints the number of files in the current directory
()
()
{ read : Never
, write : ()
}

Streaming through to stdout can be a convenient way to print a pipeline directly without going through to Elm.

module UnzipFile exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "data.gzip.txt"
            |> Stream.pipe Stream.unzip
            |> Stream.pipe Stream.stdout
            |> Stream.run
            |> BackendTask.allowFatal
        )
()
()
{ read : Never
, write : ()
}

Similar to stdout, but writes to stderr instead.

Running Streams

error
metadata
{ read : ()
, write : write
}
-> BackendTask
{ fatal : FatalError
, recoverable : Error error String
}
{ metadata : metadata
, body : String
}

Read the body of the Stream as text.

Decoder value
-> Stream
error
metadata
{ read : ()
, write : write
}
-> BackendTask
{ fatal : FatalError
, recoverable : Error error value
}
{ metadata : metadata
, body : value
}

Read the body of the Stream as JSON.

module ReadJson exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "data.json"
            |> Stream.readJson (Decode.field "name" Decode.string)
            |> BackendTask.allowFatal
            |> BackendTask.andThen
                (\{ body } ->
                    Script.log ("The name is: " ++ body)
                )
        )
error
metadata
{ read : read
, write : write
}
-> BackendTask
{ fatal : FatalError
, recoverable : Error error String
}
metadata

Ignore the body of the Stream, while capturing the metadata from the final part of the Stream.

run : Stream error metadata kind -> BackendTask FatalError ()

Gives a BackendTask to execute the Stream, ignoring its body and metadata.

This is useful if you only want the side-effect from the Stream and don't need to programmatically use its output. For example, if the end result you want is:

  • Printing to the console
  • Writing to a file
  • Making an HTTP request

If you need to read the output of the Stream, use read, readJson, or readMetadata instead.

type Error error body
= StreamError String
| CustomError error (Maybe body)

Running or reading a Stream can give one of two kinds of error:

  • StreamError String - when something in the middle of the stream fails
  • CustomError error body - when the Stream fails with a custom error

A CustomError can only come from the final part of the stream.

You can define your own custom errors by decoding metadata to an Err in the ...WithMeta helpers.

Shell Commands

Note that the commands do not execute through a shell but rather directly executes a child process. That means that special shell syntax will have no effect, but instead will be interpreted as literal characters in arguments to the command.

So instead of grep error < log.txt, you would use

module GrepErrors exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "log.txt"
            |> Stream.pipe (Stream.command "grep" [ "error" ])
            |> Stream.stdout
            |> Stream.run
        )
String
-> List String
-> Stream
Int
()
{ read : read
, write : write
}

Run a command (or child_process). The command's output becomes the body of the Stream.

Command Options

-> String
-> List String
-> Stream
Int
()
{ read : read
, write : write
}

Pass in custom CommandOptions to configure the behavior of the command.

For example, grep will return a non-zero status code if it doesn't find any matches. To ignore the non-zero status code and proceed with empty output, you can use allowNon0Status.

module GrepErrors exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "log.txt"
            |> Stream.pipe
                (Stream.commandWithOptions
                    (Stream.defaultCommandOptions |> Stream.allowNon0Status)
                    "grep"
                    [ "error" ]
                )
            |> Stream.pipe Stream.stdout
            |> Stream.run
        )
= PrintStderr
| IgnoreStderr
| MergeStderrAndStdout
| StderrInsteadOfStdout

The output configuration for withOutput. The default is PrintStderr.

  • PrintStderr - Print (but do not pass along) the stderr output of the command. Only stdout will be passed along as the body of the stream.
  • IgnoreStderr - Ignore the stderr output of the command, only include stdout
  • MergeStderrAndStdout - Both stderr and stdout will be passed along as the body of the stream.
  • StderrInsteadOfStdout - Only stderr will be passed along as the body of the stream. stdout will be ignored.

Configuration for commandWithOptions.

The default options that are used for command. Used to build up CommandOptions to pass in to commandWithOptions.

By default, the Stream will halt with an error if a command returns a non-zero status code.

With allowNon0Status, the stream will continue without an error if the command returns a non-zero status code.

By default, commands do not have a timeout. This will set the timeout, in milliseconds, for the given command. If that duration is exceeded, the Stream will fail with an error.

Command Output Strategies

There are 3 things that effect the output behavior of a command:

With BackendTask.quiet, the output of the command will not print as it runs, but you still read it in Elm if you read the Stream (instead of using Stream.run).

There are 3 ways to handle the output of a command:

  1. Read the output but don't print
  2. Print the output but don't read
  3. Ignore the output

To read the output (1), use Stream.read or Stream.readJson. This will give you the output as a String or JSON object. Regardless of whether you use BackendTask.quiet, the output will be read and returned to Elm.

To let the output from the command natively print to the console (2), use Stream.run without setting BackendTask.quiet. Based on the command's withOutput configuration, either stderr, stdout, or both will print to the console. The native output will sometimes be treated more like running the command directly in the terminal, for example elm make will print progress messages which will be cleared and updated in place.

To ignore the output (3), use Stream.run with BackendTask.quiet. This will run the command without printing anything to the console. You can also use Stream.read and ignore the captured output, but this is less efficient than using BackendTask.quiet with Stream.run.

Compression Helpers

module CompressionDemo exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.fileRead "elm.json"
            |> Stream.pipe Stream.gzip
            |> Stream.pipe (Stream.fileWrite "elm.json.gz")
            |> Stream.run
            |> BackendTask.andThen
                (\_ ->
                    Stream.fileRead "elm.json.gz"
                        |> Stream.pipe Stream.unzip
                        |> Stream.pipe Stream.stdout
                        |> Stream.run
                )
        )
()
()
{ read : ()
, write : ()
}

Transforms the input with gzip compression.

Under the hood this builds a Stream using Node's zlib.createGzip.

()
()
{ read : ()
, write : ()
}

Transforms the input by auto-detecting the header and decompressing either a Gzip- or Deflate-compressed stream.

Under the hood, this builds a Stream using Node's zlib.createUnzip.

Custom Streams

BackendTask.Custom lets you define custom BackendTasks from async NodeJS functions in your custom-backend-task file. Similarly, you can define custom streams with async functions in your custom-backend-task file, returning native NodeJS Streams, and optionally functions to extract metadata.

import { Writable, Transform, Readable } from "node:stream";

export async function upperCaseStream(input, { cwd, env, quiet }) {
  return {
    metadata: () => "Hi! I'm metadata from upperCaseStream!",
    stream: new Transform({
      transform(chunk, encoding, callback) {
        callback(null, chunk.toString().toUpperCase());
      },
    }),
  };
}

export async function customReadStream(input) {
  return new Readable({
    read(size) {
      this.push("Hello from customReadStream!");
      this.push(null);
    },
  });
}

export async function customWriteStream(input, { cwd, env, quiet }) {
  return {
    stream: new Writable({
      write(chunk, encoding, callback) {
        console.error("...received chunk...");
        console.log(chunk.toString());
        callback();
      },
    }),
    metadata: () => {
      return "Hi! I'm metadata from customWriteStream!";
    },
  };
}
module CustomStreamDemo exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.customRead "customReadStream" Encode.null
            |> Stream.pipe (Stream.customDuplex "upperCaseStream" Encode.null)
            |> Stream.pipe (Stream.customWrite "customWriteStream" Encode.null)
            |> Stream.run
        )

To extract the metadata from the custom stream, you can use the `...WithMeta` functions:

module CustomStreamDemoWithMeta exposing (run)

import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)

run : Script
run =
    Script.withoutCliOptions
        (Stream.customReadWithMeta "customReadStream" Encode.null Decode.succeed
            |> Stream.pipe (Stream.customTransformWithMeta "upperCaseStream" Encode.null Decode.succeed)
            |> Stream.readMetadata
            |> BackendTask.allowFatal
            |> BackendTask.andThen
                (\metadata ->
                    Script.log ("Metadata: " ++ metadata)
                )
        )
    --> Script.log "Metadata: Hi! I'm metadata from upperCaseStream!"
String
-> Value
-> Stream
()
()
{ read : ()
, write : Never
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS ReadableStream it returns.

String
-> Value
-> Stream
()
()
{ read : Never
, write : ()
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS WritableStream it returns.

String
-> Value
-> Stream
()
()
{ read : ()
, write : ()
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream it returns.

With Metadata Decoders

String
-> Value
-> Decoder
(Result
{ fatal : FatalError
, recoverable : error
}
metadata
)
-> Stream
error
metadata
{ read : ()
, write : Never
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream it returns.

String
-> Value
-> Decoder
(Result
{ fatal : FatalError
, recoverable : error
}
metadata
)
-> Stream
error
metadata
{ read : ()
, write : ()
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream and metadata function it returns.

String
-> Value
-> Decoder
(Result
{ fatal : FatalError
, recoverable : error
}
metadata
)
-> Stream
error
metadata
{ read : Never
, write : ()
}

Calls an async function from your custom-backend-task definitions and uses the NodeJS WritableStream and metadata function it returns.