Skip to content

Commit 3332e6e

Browse files
Merge pull request #78 from Bluetab/feature/bulk-index-action
Bulk "index" action and HEAD support
2 parents 9e1cfd2 + e829bae commit 3332e6e

File tree

7 files changed

+102
-17
lines changed

7 files changed

+102
-17
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ config :my_app, MyApp.ElasticsearchCluster,
100100
# Likewise, wait a given period between posting pages to give
101101
# Elasticsearch time to catch up.
102102
bulk_wait_interval: 15_000 # 15 seconds
103+
104+
# By default bulk indexing uses the "create" action. To allow existing
105+
# documents to be replaced, use the "index" action instead.
106+
bulk_action: "create"
103107
}
104108
}
105109
```

lib/elasticsearch.ex

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,61 @@ defmodule Elasticsearch do
425425
|> unwrap!()
426426
end
427427

428+
@doc """
429+
Determines whether a resource exists at a given Elasticsearch path
430+
431+
## Examples
432+
433+
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
434+
...> Elasticsearch.head(Cluster, "/posts")
435+
{:ok, ""}
436+
437+
It returns an error if the given resource does not exist.
438+
439+
iex> Elasticsearch.head(Cluster, "/nonexistent")
440+
{:error,
441+
%Elasticsearch.Exception{
442+
col: nil,
443+
line: nil,
444+
message: "",
445+
query: nil,
446+
raw: nil,
447+
status: nil,
448+
type: nil
449+
}}
450+
"""
451+
@spec head(Cluster.t(), url) :: response
452+
@spec head(Cluster.t(), url, opts) :: response
453+
def head(cluster, url, opts \\ []) do
454+
config = Config.get(cluster)
455+
456+
config
457+
|> config.api.request(:head, url, "", opts)
458+
|> format()
459+
end
460+
461+
@doc """
462+
Same as `head/1`, but returns the response and raises errors.
463+
464+
## Examples
465+
466+
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
467+
...> Elasticsearch.head!(Cluster, "/posts")
468+
""
469+
470+
Raises an error if the resource is invalid.
471+
472+
iex> Elasticsearch.head!(Cluster, "/nonexistent")
473+
** (Elasticsearch.Exception)
474+
"""
475+
@spec head!(Cluster.t(), url) :: map | no_return
476+
@spec head!(Cluster.t(), url, opts) :: map | no_return
477+
def head!(cluster, url, opts \\ []) do
478+
cluster
479+
|> head(url, opts)
480+
|> unwrap!()
481+
end
482+
428483
defp format({:ok, %{status_code: code, body: body}})
429484
when code >= 200 and code < 300 do
430485
{:ok, body}

lib/elasticsearch/api/api.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Elasticsearch.API do
44
"""
55

66
@typedoc "An HTTP method"
7-
@type method :: :get | :put | :post | :delete
7+
@type method :: :get | :put | :post | :delete | :head
88

99
@typedoc "The URL to request from the API"
1010
@type url :: String.t()

lib/elasticsearch/cluster/config.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ defmodule Elasticsearch.Cluster.Config do
7373
defp validate_index({_name, settings}) do
7474
Vex.validate(
7575
settings,
76-
settings: [presence: true, by: &is_binary/1],
76+
settings: [by: &(is_binary(&1) or is_map(&1))],
7777
store: [presence: true, by: &is_module/1],
7878
sources: [
7979
presence: true,

lib/elasticsearch/indexing/bulk.ex

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ defmodule Elasticsearch.Index.Bulk do
2727
%Protocol.UndefinedError{description: "",
2828
protocol: Elasticsearch.Document, value: 123}}
2929
"""
30-
@spec encode(Cluster.t(), struct, String.t()) ::
30+
@spec encode(Cluster.t(), struct, String.t(), String.t()) ::
3131
{:ok, String.t()}
3232
| {:error, Error.t()}
33-
def encode(cluster, struct, index) do
34-
{:ok, encode!(cluster, struct, index)}
33+
def encode(cluster, struct, index, action \\ "create") do
34+
{:ok, encode!(cluster, struct, index, action)}
3535
rescue
3636
exception ->
3737
{:error, exception}
@@ -51,9 +51,9 @@ defmodule Elasticsearch.Index.Bulk do
5151
iex> Bulk.encode!(Cluster, 123, "my-index")
5252
** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123 of type Integer
5353
"""
54-
def encode!(cluster, struct, index) do
54+
def encode!(cluster, struct, index, action \\ "create") do
5555
config = Cluster.Config.get(cluster)
56-
header = header(config, "create", index, struct)
56+
header = header(config, action, index, struct)
5757

5858
document =
5959
struct
@@ -99,16 +99,17 @@ defmodule Elasticsearch.Index.Bulk do
9999
config = Cluster.Config.get(cluster)
100100
bulk_page_size = index_config[:bulk_page_size] || 5000
101101
bulk_wait_interval = index_config[:bulk_wait_interval] || 0
102+
action = index_config[:bulk_action] || "create"
102103

103104
errors =
104105
store.transaction(fn ->
105106
source
106107
|> store.stream()
107-
|> Stream.map(&encode!(config, &1, index_name))
108+
|> Stream.map(&encode!(config, &1, index_name, action))
108109
|> Stream.chunk_every(bulk_page_size)
109110
|> Stream.intersperse(bulk_wait_interval)
110111
|> Stream.map(&put_bulk_page(config, index_name, &1))
111-
|> Enum.reduce(errors, &collect_errors/2)
112+
|> Enum.reduce(errors, &collect_errors(&1, &2, action))
112113
end)
113114

114115
upload(config, index_name, %{index_config | sources: tail}, errors)
@@ -123,21 +124,21 @@ defmodule Elasticsearch.Index.Bulk do
123124
Elasticsearch.put(config, "/#{index_name}/_doc/_bulk", Enum.join(items))
124125
end
125126

126-
defp collect_errors({:ok, %{"errors" => true} = response}, errors) do
127+
defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do
127128
new_errors =
128129
response["items"]
129-
|> Enum.filter(&(&1["create"]["error"] != nil))
130-
|> Enum.map(& &1["create"])
130+
|> Enum.filter(&(&1[action]["error"] != nil))
131+
|> Enum.map(& &1[action])
131132
|> Enum.map(&Elasticsearch.Exception.exception(response: &1))
132133

133134
new_errors ++ errors
134135
end
135136

136-
defp collect_errors({:error, error}, errors) do
137+
defp collect_errors({:error, error}, errors, _action) do
137138
[error | errors]
138139
end
139140

140-
defp collect_errors(_response, errors) do
141+
defp collect_errors(_response, errors, _action) do
141142
errors
142143
end
143144
end

lib/elasticsearch/indexing/index.ex

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ defmodule Elasticsearch.Index do
3030
alias = alias_to_atom(alias)
3131
name = build_name(alias)
3232
config = Config.get(cluster)
33-
%{settings: settings_file} = index_config = config[:indexes][alias]
33+
%{settings: settings} = index_config = config[:indexes][alias]
3434

35-
with :ok <- create_from_file(config, name, settings_file),
35+
with :ok <- create_from_settings(config, name, settings),
3636
:ok <- Bulk.upload(config, name, index_config),
3737
:ok <- __MODULE__.alias(config, name, to_string(alias)),
3838
:ok <- clean_starting_with(config, to_string(alias), 2),
@@ -273,6 +273,31 @@ defmodule Elasticsearch.Index do
273273
end
274274
end
275275

276+
@doc """
277+
Creates an index with the given name, with settings loaded from a map or a JSON file (see `create_from_file/3`).
278+
279+
## Example
280+
281+
iex> Index.create_from_settings(Cluster, "posts-1", %{})
282+
:ok
283+
284+
iex> Index.create_from_settings(Cluster, "posts-1", "nonexistent.json")
285+
{:error, :enoent}
286+
"""
287+
@spec create_from_settings(Cluster.t(), String.t(), map | Path.t()) ::
288+
:ok
289+
| {:error, File.posix()}
290+
| {:error, Elasticsearch.Exception.t()}
291+
def create_from_settings(cluster, name, settings)
292+
293+
def create_from_settings(cluster, name, settings) when is_map(settings) do
294+
create(cluster, name, settings)
295+
end
296+
297+
def create_from_settings(cluster, name, file) do
298+
create_from_file(cluster, name, file)
299+
end
300+
276301
@doc """
277302
Generates a name for an index that will be aliased to a given `alias`.
278303
Similar to migrations, the name will contain a timestamp.

test/elasticsearch/cluster/cluster_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ defmodule Elasticsearch.ClusterTest do
103103
test "validates indexes" do
104104
errors = errors_on(%{valid_config() | indexes: %{example: %{}}})
105105

106-
for field <- [:settings, :store, :sources, :bulk_page_size, :bulk_wait_interval] do
106+
for field <- [:store, :sources, :bulk_page_size, :bulk_wait_interval] do
107107
assert {"must be present", validation: :presence} in errors[field]
108108
end
109109

0 commit comments

Comments
 (0)