Demux overview
Common demux utilities for hijacked docker streams.
Added in v1.0.0
Table of contents
Demux
demuxAnyToSeparateSinks
Demux a single raw socket, multiplexed socket, or multiple raw sockets to two sinks. If given a single raw stream socket, then stdout and stderr will be combined on the same sink. If given a multiplexed stream socket, then stdout and stderr will be forwarded to different sinks. If given multiple raw stream sockets, then you can choose which ones to provide. The return type will depend on the type of socket provided, so this isn’t suitable for all use cases. If you need a unified signature, you should use {@link demuxUnknownToSeparateSinks}.
Signature
export declare const demuxAnyToSeparateSinks: {
<A1, A2, E1, E2, E3, R1, R2, R3>(
socket: RawStreamSocket,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): Effect.Effect<
A1,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): (
socket: RawStreamSocket
) => Effect.Effect<
A1,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): Effect.Effect<
CompressedDemuxOutput<A1, A2>,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): (
socket: MultiplexedStreamSocket
) => Effect.Effect<
CompressedDemuxOutput<A1, A2>,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3, SocketOptions extends Demux.StdinStdoutStderrSocketOptions>(
sockets: SocketOptions,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): Effect.Effect<
CompressedStdinStdoutStderrOutput<SocketOptions, A1, A2>,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3, SocketOptions extends Demux.StdinStdoutStderrSocketOptions>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): (
sockets: SocketOptions
) => Effect.Effect<
CompressedStdinStdoutStderrOutput<SocketOptions, A1, A2>,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
}
Added in v1.0.0
demuxToSeparateSinks
Demux either a multiplexed socket or multiple raw socket to separate sinks. If you need to also demux a single raw socket, then use {@link demuxUnknownToSeparateSinks} instead.
Signature
export declare const demuxToSeparateSinks: {
<A1, A2, E1, E2, E3, R1, R2, R3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): Effect.Effect<
CompressedDemuxOutput<A1, A2>,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): (
socket: MultiplexedStreamSocket
) => Effect.Effect<
CompressedDemuxOutput<A1, A2>,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3, SocketOptions extends Demux.StdinStdoutStderrSocketOptions>(
sockets: SocketOptions,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): Effect.Effect<
CompressedStdinStdoutStderrOutput<SocketOptions, A1, A2>,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3, SocketOptions extends Demux.StdinStdoutStderrSocketOptions>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { encoding?: string | undefined } | undefined
): (
sockets: SocketOptions
) => Effect.Effect<
CompressedStdinStdoutStderrOutput<SocketOptions, A1, A2>,
E1 | E2 | E3 | Socket.SocketError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
}
Example
// Demux a multiplexed socket to two sinks
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/DockerEngine"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
OpenStdin: true,
Cmd: ["bash", "-c", 'read | md5sum && >&2 echo "Hi2"']
}
})
// Since the container was started with "tty: false",
// we should get a multiplexed socket here
const socket: MobyDemux.RawStreamSocket | MobyDemux.MultiplexedStreamSocket = yield* containers.attach(containerId, {
stdin: true,
stdout: true,
stderr: true,
stream: true
})
assert.ok(MobyDemux.isMultiplexedStreamSocket(socket), "Expected a multiplexed stream socket")
// Demux to a separate sinks
const [stdoutData, stderrData] = yield* MobyDemux.demuxToSeparateSinks(
socket,
Stream.make("a\n"),
Sink.mkString,
Sink.mkString
)
assert.strictEqual(stdoutData, "d41d8cd98f00b204e9800998ecf8427e -\n")
assert.strictEqual(stderrData, "Hi2\n")
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Example
// Demux multiple raw sockets to two sinks
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/engines/Docker"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
// Tty: true,
OpenStdin: true,
Cmd: ["bash", "-c", 'read | md5sum && >&2 echo "Hi2"']
}
})
// It doesn't matter what tty option we start the container
// with here, we will only get a raw socket
const stdinSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdin: true,
stream: true
})
const stdoutSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdout: true,
stream: true
})
const stderrSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stderr: true,
stream: true
})
assert.ok(MobyDemux.isRawStreamSocket(stdinSocket), "Expected a raw socket")
assert.ok(MobyDemux.isRawStreamSocket(stdoutSocket), "Expected a raw socket")
assert.ok(MobyDemux.isRawStreamSocket(stderrSocket), "Expected a raw socket")
// Demux to a separate sinks
const [stdoutData, stderrData] = yield* MobyDemux.demuxToSeparateSinks(
{
stdin: stdinSocket,
stdout: stdoutSocket,
stderr: stderrSocket
},
Stream.make("a\n"),
Sink.mkString,
Sink.mkString
)
assert.strictEqual(stdoutData, "d41d8cd98f00b204e9800998ecf8427e -\n")
assert.strictEqual(stderrData, "Hi2\n")
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Example
// Demux single raw socket to two sinks
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/DockerEngine"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
// Tty: true,
Cmd: ["bash", "-c", 'sleep 2s && echo "Hi" && >&2 echo "Hi2"']
}
})
// It doesn't matter what tty option we start the container
// with here, we will only get a raw socket
const stdoutSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdout: true,
stream: true
})
assert.ok(MobyDemux.isRawStreamSocket(stdoutSocket), "Expected a raw socket")
// Demux to a separate sinks
const [stdoutData, stderrData] = yield* MobyDemux.demuxToSeparateSinks(
{ stdout: stdoutSocket },
Stream.make("a\n"),
Sink.mkString,
Sink.mkString
)
assert.strictEqual(stdoutData, "Hi\n")
assert.strictEqual(stderrData, undefined)
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Added in v1.0.0
demuxToSingleSink
Demux either a raw socket, multiplexed socket, or multiple raw socket(s) to a single sink.
Signature
export declare const demuxToSingleSink: {
<A1, E1, E2, R1, R2>(
socket: RawStreamSocket,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { encoding?: string | undefined } | undefined
): Effect.Effect<A1, E1 | E2 | Socket.SocketError, Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>>
<A1, E1, E2, R1, R2>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { encoding?: string | undefined } | undefined
): (
socket: RawStreamSocket
) => Effect.Effect<A1, E1 | E2 | Socket.SocketError, Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>>
<A1, E1, E2, R1, R2>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): Effect.Effect<
A1,
E1 | E2 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>
>
<A1, E1, E2, R1, R2>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): (
socket: MultiplexedStreamSocket
) => Effect.Effect<
A1,
E1 | E2 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>
>
<A1, E1, E2, R1, R2>(
sockets: Demux.StdinStdoutStderrSocketOptions,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { encoding?: string | undefined } | undefined
): Effect.Effect<A1, E1 | E2 | Socket.SocketError, Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>>
<A1, E1, E2, R1, R2>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { encoding?: string | undefined } | undefined
): (
sockets: Demux.StdinStdoutStderrSocketOptions
) => Effect.Effect<A1, E1 | E2 | Socket.SocketError, Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>>
}
Example
// Demux a single raw socket to one sink
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/DockerEngine"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
Tty: true,
OpenStdin: true,
Cmd: ["bash", "-c", 'read | md5sum && >&2 echo "Hi2"']
}
})
// Since the container was started with "tty: true",
// we should get a raw socket here
const socket: MobyDemux.RawStreamSocket | MobyDemux.MultiplexedStreamSocket = yield* containers.attach(containerId, {
stdin: true,
stdout: true,
stderr: true,
stream: true
})
assert.ok(MobyDemux.isRawStreamSocket(socket), "Expected a raw socket")
// Demux to a single sink
const input = Stream.make("a\n")
const data = yield* MobyDemux.demuxToSingleSink(socket, input, Sink.mkString)
assert.strictEqual(data, "a\r\nd41d8cd98f00b204e9800998ecf8427e -\r\nHi2\r\n")
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Example
// Demux a multiplexed socket to one sink
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/DockerEngine"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
// Tty: true,
OpenStdin: true,
Cmd: ["bash", "-c", 'read | md5sum && >&2 echo "Hi2"']
}
})
// Since the container was started with "tty: false",
// we should get a multiplexed socket here
const socket: MobyDemux.RawStreamSocket | MobyDemux.MultiplexedStreamSocket = yield* containers.attach(containerId, {
stdin: true,
stdout: true,
stderr: true,
stream: true
})
assert.ok(MobyDemux.isMultiplexedStreamSocket(socket), "Expected a multiplexed stream socket")
// Demux to a single sink
const input = Stream.make("a\n")
const data = yield* MobyDemux.demuxToSingleSink(socket, input, Sink.mkString)
assert.strictEqual(data, "d41d8cd98f00b204e9800998ecf8427e -\nHi2\n")
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Example
// Demux multiple raw sockets to one sink
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/engines/Docker"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
// Tty: true,
OpenStdin: true,
Cmd: ["bash", "-c", 'read | md5sum && >&2 echo "Hi2"']
}
})
// It doesn't matter what tty option we start the container
// with here, we will only get a raw socket
const stdinSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdin: true,
stream: true
})
const stdoutSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdout: true,
stream: true
})
const stderrSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stderr: true,
stream: true
})
assert.ok(MobyDemux.isRawStreamSocket(stdinSocket), "Expected a raw socket")
assert.ok(MobyDemux.isRawStreamSocket(stdoutSocket), "Expected a raw socket")
assert.ok(MobyDemux.isRawStreamSocket(stderrSocket), "Expected a raw socket")
// Demux to a single sink
const data = yield* MobyDemux.demuxToSingleSink(
{
stdin: stdinSocket,
stdout: stdoutSocket,
stderr: stderrSocket
},
Stream.make("a\n"),
Sink.mkString
)
assert.ok(
[
// When tty: false
"d41d8cd98f00b204e9800998ecf8427e -\nHi2\n",
"Hi2\nd41d8cd98f00b204e9800998ecf8427e -\n",
// When tty: true
"a\r\nd41d8cd98f00b204e9800998ecf8427e -\r\nHi2\r\n",
"a\r\nHi2\r\nd41d8cd98f00b204e9800998ecf8427e -\r\n"
].includes(data)
)
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Example
// Demux a single raw socket to one sink
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Function from "effect/Function"
import * as Layer from "effect/Layer"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as MobyConnection from "the-moby-effect/MobyConnection"
import * as MobyConvey from "the-moby-effect/MobyConvey"
import * as MobyDemux from "the-moby-effect/MobyDemux"
import * as MobyEndpoints from "the-moby-effect/MobyEndpoints"
import * as DockerEngine from "the-moby-effect/engines/Docker"
const layer = Function.pipe(
MobyConnection.connectionOptionsFromPlatformSystemSocketDefault,
Effect.map(DockerEngine.layerNodeJS),
Layer.unwrapEffect
)
Effect.gen(function* () {
const image = "ubuntu:latest"
const containers = yield* MobyEndpoints.Containers
// Pull the image, which will be removed when the scope is closed
const pullStream = DockerEngine.pull({ image })
yield* MobyConvey.followProgressInConsole(pullStream)
// Start a container, which will be removed when the scope is closed
const { Id: containerId } = yield* DockerEngine.runScoped({
spec: {
Image: image,
// Tty: true,
AttachStdout: true,
Cmd: ["bash", "-c", 'sleep 2s && echo "Hi" && >&2 echo "Hi2"']
}
})
// It doesn't matter what tty option we start the container
// with here, we will only get a raw socket
const stdoutSocket: MobyDemux.RawStreamSocket = yield* containers.attachWebsocket(containerId, {
stdout: true,
stream: true
})
assert.ok(MobyDemux.isRawStreamSocket(stdoutSocket), "Expected a raw socket")
// Demux to a single sink
const data = yield* MobyDemux.demuxToSingleSink({ stdout: stdoutSocket }, Stream.make("a\n"), Sink.mkString)
assert.strictEqual(data, "Hi\n")
// Wait for the container to exit
yield* containers.wait(containerId)
})
.pipe(Effect.scoped)
.pipe(Effect.provide(layer))
.pipe(NodeRuntime.runMain)
Added in v1.0.0
demuxUnknownToSeparateSinks
Like {@link demuxAnyToSeparateSinks}, but with unknown sockets and a unified signature.
Signature
export declare const demuxUnknownToSeparateSinks: {
<A1, A2, E1, E2, E3, R1, R2, R3>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): (
socket: Demux.AnySocketOptions
) => Effect.Effect<
void,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
<A1, A2, E1, E2, E3, R1, R2, R3>(
socket: Demux.AnySocketOptions,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink1: Sink.Sink<A1, string, string, E2, R2>,
sink2: Sink.Sink<A2, string, string, E3, R3>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): Effect.Effect<
void,
E1 | E2 | E3 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope> | Exclude<R3, Scope.Scope>
>
}
Added in v1.0.0
demuxUnknownToSingleSink
Like {@link demuxToSingleSink}, but with unknown input socket.
Signature
export declare const demuxUnknownToSingleSink: {
<A1, E1, E2, R1, R2>(
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): (
socket: Demux.AnySocketOptions
) => Effect.Effect<
A1,
E1 | E2 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>
>
<A1, E1, E2, R1, R2>(
socket: Demux.AnySocketOptions,
source: Stream.Stream<string | Uint8Array, E1, R1>,
sink: Sink.Sink<A1, string, string, E2, R2>,
options?: { bufferSize?: number | undefined; encoding?: string | undefined } | undefined
): Effect.Effect<
A1,
E1 | E2 | Socket.SocketError | ParseResult.ParseError,
Exclude<R1, Scope.Scope> | Exclude<R2, Scope.Scope>
>
}
Added in v1.0.0
Types
Demux (namespace)
Added in v1.0.0
AnySocketOptions (type alias)
Signature
export type AnySocketOptions = RawStreamSocket | MultiplexedStreamSocket | StdinStdoutStderrSocketOptions
Added in v1.0.0
StdinStdoutStderrSocketOptions (type alias)
Signature
export type StdinStdoutStderrSocketOptions =
| { stdin: RawStreamSocket; stdout?: never; stderr?: never }
| { stdin?: never; stdout: RawStreamSocket; stderr?: never }
| { stdin?: never; stdout?: never; stderr: RawStreamSocket }
| { stdin: RawStreamSocket; stdout: RawStreamSocket; stderr?: never }
| { stdin: RawStreamSocket; stdout?: never; stderr: RawStreamSocket }
| { stdin?: never; stdout: RawStreamSocket; stderr: RawStreamSocket }
| { stdin: RawStreamSocket; stdout: RawStreamSocket; stderr: RawStreamSocket }
Added in v1.0.0