A Stream represents a flow of data through a pipeline.
It is typically
Stream { read : (), write : Never })Stream { read : Never, write : () })Stream { read : (), write : () })For example, you could have a stream that
fileReadunzipcommandhttpWithInputFor example,
import BackendTask.Stream as Stream exposing (Stream)
example =
Stream.fileRead "data.txt"
|> Stream.unzip
|> Stream.command "wc" [ "-l" ]
|> Stream.httpWithInput
{ url = "http://example.com"
, method = "POST"
, headers = []
, retries = Nothing
, timeoutInMs = Nothing
}
|> Stream.run
End example
You can build up a pipeline of streams by using the pipe function.
The stream you are piping to must be writable ({ write : () }),
and the stream you are piping from must be readable ({ read : () }).
module HelloWorld exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fromString "Hello, World!"
|> Stream.stdout
|> Stream.run
)
Open a file's contents as a Stream.
module ReadFile exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "elm.json"
|> Stream.readJson (Decode.field "source-directories" (Decode.list Decode.string))
|> BackendTask.allowFatal
|> BackendTask.andThen
(\{ body } ->
Script.log
("The source directories are: "
++ String.join ", " body
)
)
)
If you want to read a file but don't need to use any of the other Stream functions, you can use BackendTask.File.read instead.
Write a Stream to a file.
module WriteFile exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "logs.txt"
|> Stream.pipe (Stream.command "grep" [ "error" ])
|> Stream.pipe (Stream.fileWrite "errors.txt")
)
A handy way to turn either a hardcoded String, or any other value from Elm into a Stream.
module HelloWorld exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fromString "Hello, World!"
|> Stream.stdout
|> Stream.run
|> BackendTask.allowFatal
)
A more programmatic use of fromString to use the result of a previous BackendTask to a Stream:
module HelloWorld exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Glob.fromString "src/**/*.elm"
|> BackendTask.andThen
(\elmFiles ->
elmFiles
|> String.join ", "
|> Stream.fromString
|> Stream.pipe Stream.stdout
|> Stream.run
)
)
Uses a regular HTTP request body (not a Stream). Streams the HTTP response body.
If you want to pass a stream as the request body, use httpWithInput instead.
If you don't need to stream the response body, you can use the functions from BackendTask.Http instead.
Streams the data from the input stream as the body of the HTTP request. The HTTP response body becomes the output stream.
The stdin from the process. When you execute an elm-pages script, this will be the value that is piped in to it. For example, given this script module:
module CountLines exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.stdin
|> Stream.read
|> BackendTask.allowFatal
|> BackendTask.andThen
(\{ body } ->
body
|> String.lines
|> List.length
|> String.fromInt
|> Script.log
)
)
If you run the script without any stdin, it will wait until stdin is closed.
elm-pages run script/src/CountLines.elm
# pressing ctrl-d (or your platform-specific way of closing stdin) will print the number of lines in the input
Or you can pipe to it and it will read that input:
ls | elm-pages run script/src/CountLines.elm
# prints the number of files in the current directory
Streaming through to stdout can be a convenient way to print a pipeline directly without going through to Elm.
module UnzipFile exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "data.gzip.txt"
|> Stream.pipe Stream.unzip
|> Stream.pipe Stream.stdout
|> Stream.run
|> BackendTask.allowFatal
)
Similar to stdout, but writes to stderr instead.
Read the body of the Stream as text.
Read the body of the Stream as JSON.
module ReadJson exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Json.Decode as Decode
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "data.json"
|> Stream.readJson (Decode.field "name" Decode.string)
|> BackendTask.allowFatal
|> BackendTask.andThen
(\{ body } ->
Script.log ("The name is: " ++ body)
)
)
Ignore the body of the Stream, while capturing the metadata from the final part of the Stream.
Gives a BackendTask to execute the Stream, ignoring its body and metadata.
This is useful if you only want the side-effect from the Stream and don't need to programmatically use its
output. For example, if the end result you want is:
If you need to read the output of the Stream, use read, readJson, or readMetadata instead.
Running or reading a Stream can give one of two kinds of error:
StreamError String - when something in the middle of the stream failsCustomError error body - when the Stream fails with a custom errorA CustomError can only come from the final part of the stream.
You can define your own custom errors by decoding metadata to an Err in the ...WithMeta helpers.
Note that the commands do not execute through a shell but rather directly executes a child process. That means that special shell syntax will have no effect, but instead will be interpreted as literal characters in arguments to the command.
So instead of grep error < log.txt, you would use
module GrepErrors exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "log.txt"
|> Stream.pipe (Stream.command "grep" [ "error" ])
|> Stream.stdout
|> Stream.run
)
Run a command (or child_process). The command's output becomes the body of the Stream.
Pass in custom CommandOptions to configure the behavior of the command.
For example, grep will return a non-zero status code if it doesn't find any matches. To ignore the non-zero status code and proceed with
empty output, you can use allowNon0Status.
module GrepErrors exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "log.txt"
|> Stream.pipe
(Stream.commandWithOptions
(Stream.defaultCommandOptions |> Stream.allowNon0Status)
"grep"
[ "error" ]
)
|> Stream.pipe Stream.stdout
|> Stream.run
)
The output configuration for withOutput. The default is PrintStderr.
PrintStderr - Print (but do not pass along) the stderr output of the command. Only stdout will be passed along as the body of the stream.IgnoreStderr - Ignore the stderr output of the command, only include stdoutMergeStderrAndStdout - Both stderr and stdout will be passed along as the body of the stream.StderrInsteadOfStdout - Only stderr will be passed along as the body of the stream. stdout will be ignored. Configuration for commandWithOptions.
The default options that are used for command. Used to build up CommandOptions
to pass in to commandWithOptions.
By default, the Stream will halt with an error if a command returns a non-zero status code.
With allowNon0Status, the stream will continue without an error if the command returns a non-zero status code.
Configure the StderrOutput behavior.
By default, commands do not have a timeout. This will set the timeout, in milliseconds, for the given command. If that duration is exceeded,
the Stream will fail with an error.
There are 3 things that effect the output behavior of a command:
BackendTask context (BackendTask.quiet)Stream output is ignored (Stream.run), or read (Stream.read)withOutput (allows you to use stdout, stderr, or both)With BackendTask.quiet, the output of the command will not print as it runs, but you still read it in Elm if you read the Stream (instead of using Stream.run).
There are 3 ways to handle the output of a command:
To read the output (1), use Stream.read or Stream.readJson. This will give you the output as a String or JSON object.
Regardless of whether you use BackendTask.quiet, the output will be read and returned to Elm.
To let the output from the command natively print to the console (2), use Stream.run without setting BackendTask.quiet. Based on
the command's withOutput configuration, either stderr, stdout, or both will print to the console. The native output will
sometimes be treated more like running the command directly in the terminal, for example elm make will print progress
messages which will be cleared and updated in place.
To ignore the output (3), use Stream.run with BackendTask.quiet. This will run the command without printing anything to the console.
You can also use Stream.read and ignore the captured output, but this is less efficient than using BackendTask.quiet with Stream.run.
module CompressionDemo exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.fileRead "elm.json"
|> Stream.pipe Stream.gzip
|> Stream.pipe (Stream.fileWrite "elm.json.gz")
|> Stream.run
|> BackendTask.andThen
(\_ ->
Stream.fileRead "elm.json.gz"
|> Stream.pipe Stream.unzip
|> Stream.pipe Stream.stdout
|> Stream.run
)
)
Transforms the input with gzip compression.
Under the hood this builds a Stream using Node's zlib.createGzip.
Transforms the input by auto-detecting the header and decompressing either a Gzip- or Deflate-compressed stream.
Under the hood, this builds a Stream using Node's zlib.createUnzip.
BackendTask.Custom lets you define custom BackendTasks from async NodeJS functions in your custom-backend-task file.
Similarly, you can define custom streams with async functions in your custom-backend-task file, returning native NodeJS Streams, and optionally functions to extract metadata.
import { Writable, Transform, Readable } from "node:stream";
export async function upperCaseStream(input, { cwd, env, quiet }) {
return {
metadata: () => "Hi! I'm metadata from upperCaseStream!",
stream: new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
}),
};
}
export async function customReadStream(input) {
return new Readable({
read(size) {
this.push("Hello from customReadStream!");
this.push(null);
},
});
}
export async function customWriteStream(input, { cwd, env, quiet }) {
return {
stream: new Writable({
write(chunk, encoding, callback) {
console.error("...received chunk...");
console.log(chunk.toString());
callback();
},
}),
metadata: () => {
return "Hi! I'm metadata from customWriteStream!";
},
};
}
module CustomStreamDemo exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.customRead "customReadStream" Encode.null
|> Stream.pipe (Stream.customDuplex "upperCaseStream" Encode.null)
|> Stream.pipe (Stream.customWrite "customWriteStream" Encode.null)
|> Stream.run
)
To extract the metadata from the custom stream, you can use the `...WithMeta` functions:
module CustomStreamDemoWithMeta exposing (run)
import BackendTask
import BackendTask.Stream as Stream
import Pages.Script as Script exposing (Script)
run : Script
run =
Script.withoutCliOptions
(Stream.customReadWithMeta "customReadStream" Encode.null Decode.succeed
|> Stream.pipe (Stream.customTransformWithMeta "upperCaseStream" Encode.null Decode.succeed)
|> Stream.readMetadata
|> BackendTask.allowFatal
|> BackendTask.andThen
(\metadata ->
Script.log ("Metadata: " ++ metadata)
)
)
--> Script.log "Metadata: Hi! I'm metadata from upperCaseStream!"
Calls an async function from your custom-backend-task definitions and uses the NodeJS ReadableStream it returns.
Calls an async function from your custom-backend-task definitions and uses the NodeJS WritableStream it returns.
Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream it returns.
Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream it returns.
Calls an async function from your custom-backend-task definitions and uses the NodeJS DuplexStream and metadata function it returns.
Calls an async function from your custom-backend-task definitions and uses the NodeJS WritableStream and metadata function it returns.
Once you've defined a
Stream, it can be turned into aBackendTaskthat will run it (and optionally read its output and metadata).