A powerful TypeScript library for building JSON processing pipelines with async iterables. This library makes it easy to create efficient command-line programs that process JSON data through a series of transformations, with support for streaming and backpressure.
- Process JSON data streams using async iterables
- Rich set of transformation operators:
map
andmapAsync
with configurable concurrencyflatMap
andflatMapAsync
for nested transformationsfilter
for conditional processingdistinct
for deduplicationgroupBy
andgroupWhile
for data aggregationbatch
for processing items in chunkstake
andtakeWhile
for limiting outputreduce
for aggregating results
- Built-in support for reading/writing JSON Lines format
- Composable transformations using
pipe
andcompose
- Efficient streaming with backpressure handling
- Written in TypeScript with full type safety
- Node.js 8+ compatible
npm install @space48/json-pipe
To install the latest beta version:
npm install @space48/json-pipe@beta
Beta versions are automatically published:
- When code is pushed to the
develop
branch (version format:x.y.z-beta.commit-hash
) - When a pull request is opened against
master
(version format:x.y.z-beta.pr.number
)
import { pipe, readJsonLinesFrom, writeJsonLinesTo, map } from '@space48/json-pipe';
// Read JSON Lines from stdin, transform them, and write to stdout
await pipe(
readJsonLinesFrom(process.stdin),
map(data => {
// Your transformation logic here
return transformedData;
}),
writeJsonLinesTo(process.stdout)
);
import { pipe, mapAsync } from '@space48/json-pipe';
// Process items concurrently
const transform = mapAsync(
{ concurrency: 5 }, // Process 5 items at a time
async item => {
// Async transformation logic
return processedItem;
}
);
import { pipe, groupBy, batch } from '@space48/json-pipe';
// Group items by a key
const groupedTransform = pipe(
input,
groupBy(item => item.category),
// Process in batches of 100
batch(100)
);
pipe(...fns)
: Compose multiple transformationscompose(...fns)
: Compose functions from right to leftmap(fn)
: Transform each itemmapAsync(options, fn)
: Transform items asynchronously with concurrency controlflatMap(fn)
: Transform and flatten resultsflatMapAsync(options, fn)
: Transform and flatten asynchronouslyfilter(predicate)
: Filter items based on a predicatedistinct(compare?)
: Remove duplicatesgroupBy(getKeyFn)
: Group items by keybatch(size)
: Process items in batchestake(n)
: Limit number of itemsreduce(fn, initialValue?)
: Aggregate items
readJsonLinesFrom(source)
: Read JSON Lines from a readable streamwriteJsonLinesTo(destination)
: Write JSON Lines to a writable streamreadLinesFrom(source)
: Read lines from a readable stream
- Node.js >= 8
- TypeScript (for development)
- Clone the repository:
git clone https://github.com/Space48/json-pipe.git
- Install dependencies:
npm install
- Build the project:
npm run build
This project is licensed under the MIT License - see the LICENSE file for details.
For bug reports and feature requests, please open an issue on the GitHub repository.
Maintained by Space48