alias esrt='uvx [email protected]'
esrt -Valias esrt='pipx run esrt==8.2.0'
esrt -Vsearchscanrequestbulk
ping
You can start an es service with docker.
docker run --name "esrt-es" --rm -itd --platform=linux/amd64 -p 9200:9200 elasticsearch:5.6.9-alpine
# install sql command and restart container:
docker exec "esrt-es" elasticsearch-plugin install https://github.com/NLPchina/elasticsearch-sql/releases/download/5.6.9.0/elasticsearch-sql-5.6.9.0.zip
docker restart "esrt-es"Check server:
esrt es request localhost -X HEAD
# ->
# trueCreate a index:
esrt es request localhost -X PUT /my-index
# ->
# {
#   "acknowledged": true,
#   "shards_acknowledged": true,
#   "index": "my-index"
# }If you want to esrt quote url path for you, add flag: -Q(--quote-url)
Cat it:
esrt es request localhost -X GET /_cat/indices
# ->
# yellow open my-index NMHssX4qTgeMFrA3cXPoKg 5 1 0 0 324b 324b
esrt es request localhost -X GET /_cat/indices -p v=
# ->
# health status index    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
# yellow open   my-index NMHssX4qTgeMFrA3cXPoKg   5   1          0            0       810b           810b
esrt es request localhost -X GET /_cat/indices -p v= -p format=json
# ->
# [
#   {
#     "health": "yellow",
#     "status": "open",
#     "index": "my-index",
#     "uuid": "NMHssX4qTgeMFrA3cXPoKg",
#     "pri": "5",
#     "rep": "1",
#     "docs.count": "0",
#     "docs.deleted": "0",
#     "store.size": "810b",
#     "pri.store.size": "810b"
#   }
# ]Bulk with data from file examples/bulk.ndjson:
{ "_op_type": "index",  "_index": "my-index", "_type": "type1", "_id": "1", "field1": "ii" }
{ "_op_type": "delete", "_index": "my-index", "_type": "type1", "_id": "1" }
{ "_op_type": "create", "_index": "my-index", "_type": "type1", "_id": "1", "field1": "cc" }
{ "_op_type": "update", "_index": "my-index", "_type": "type1", "_id": "1", "doc": {"field2": "uu"} }esrt es bulk localhost -y -f examples/bulk.ndjson
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00    4/? ?Read payload from stdin. And -d can be omitted.
esrt es bulk localhost -y <<EOF
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "1", "field1": "11" }
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "2", "field1": "22" }
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "3", "field1": "33" }
EOF
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00    3/? ?Piping heredoc also works.
cat <<EOF | esrt es bulk localhost -y
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "1", "field1": "11" }
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "2", "field1": "22" }
{ "_op_type": "index",  "_index": "my-index-2", "_type": "type1", "_id": "3", "field1": "33" }
EOF
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00    3/? ?Pipe _search result and update _index with customized handler to do more operations before bulk!
alias jq_es_hits="jq '.hits.hits[]'"esrt es request localhost -X GET /my-index-2/_search | jq_es_hits -c | esrt es bulk localhost -y -w examples.my-handlers:handle  # <- `examples/my-handlers.py`
# ->
# ⠹ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00    3/? ?# examples/my-handlers.py
import json
import typing as t
from esrt es import DocHandler
# function style
def my_handler(actions: t.Iterable[str]):
    for action in actions:
        obj = json.loads(action)
        prefix = 'new-'
        if not t.cast(str, obj['_index']).startswith(prefix):
            obj['_index'] = prefix + obj['_index']
        yield obj
# class style
class MyHandler(DocHandler):
    def handle(self, actions: t.Iterable[str]):
        for action in actions:
            yield self.handle_one(action)
    def handle_one(self, action: str):
        obj = super().handle_one(action)
        prefix = 'new-'
        if not t.cast(str, obj['_index']).startswith(prefix):
            obj['_index'] = prefix + obj['_index']
        return objesrt es search localhost | jq_es_hits -c
# ->
# {"_index":"my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"my-index","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"cc","field2":"uu"}}
# {"_index":"my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}esrt es search localhost -f - <<EOF | jq_es_hits -c
{"query": {"term": {"_index": "new-my-index-2"}}}
EOF
# ->
# {"_index":"new-my-index-2","_type":"type1","_id":"2","_score":1.0,"_source":{"field1":"22"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"1","_score":1.0,"_source":{"field1":"11"}}
# {"_index":"new-my-index-2","_type":"type1","_id":"3","_score":1.0,"_source":{"field1":"33"}}esrt es scan localhost -y
# ->
# {"_index": "my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
esrt es scan localhost -y -f - <<EOF
{"query": {"term": {"field1": "cc"}}}
EOF
# ->
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# examples/create-massive-docs.py
import json
import uuid
if __name__ == '__main__':
    for i, _ in enumerate(range(54321), start=1):
        d = {
            '_index': 'my-index-a',
            '_id': i,
            '_type': 'type1',
            '_source': {'field1': str(uuid.uuid4())},
        }
        print(json.dumps(d))python examples/create-massive-docs.py | tee _.ndjson | esrt es bulk localhost -y -c 10000
# ->
# ⠋ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:26    654321/? 24504/s
cat _.ndjson  # <- 79M
head _.ndjson
# ->
# {"_index": "my-index-a", "_type": "type1", "_id": "70004", "_score": null, "_source": {"field1": "7fc553c1-d09f-4793-bc28-2a16a6050ef4"}, "sort": [0]}
# {"_index": "my-index-a", "_type": "type1", "_id": "80002", "_score": null, "_source": {"field1": "7fddf2f7-195f-4964-81f1-bb32d63be8b0"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index-a", "_type": "type1", "_id": "70003", "_score": null, "_source": {"field1": "2a08f0e0-cdbd-47d3-b7e3-ee8fd1e27ff8"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "2", "_score": null, "_source": {"field1": "22"}, "sort": [0]}
# {"_index": "my-index", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "cc", "field2": "uu"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "1", "_score": null, "_source": {"field1": "11"}, "sort": [0]}
# {"_index": "my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# {"_index": "new-my-index-2", "_type": "type1", "_id": "3", "_score": null, "_source": {"field1": "33"}, "sort": [0]}
# examples/copy-more-docs.py
from copy import deepcopy
import json
import typing as t
import uuid
if __name__ == '__main__':
    for i, _ in enumerate(range(54321), start=1):
        d = {
            '_index': 'my-index-b',
            '_id': i,
            '_type': 'type1',
            '_source': {'field1': str(uuid.uuid4())},
        }
        print(json.dumps(d))
def handle(actions: t.Iterable[str]):
    for action in actions:
        d: dict[str, t.Any] = json.loads(action)
        yield d
        d2 = deepcopy(d)
        d2['_source']['field1'] += '!!!'
        d2['_source']['field2'] = str(uuid.uuid4())
        yield d2python examples/copy-more-docs.py | esrt es bulk localhost -y -w examples.copy-more-docs:handle
# ->
# ⠏ bulk ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:00:05    108642/? 18963/s