Skip to content

Commit 4a1dfa7

Browse files
committed
Reindex now allows the target type to be different (#2480)
* Reindex now allows the target type to be different User has to supply an explicit map between TSource and TTarget, the old overloads where TSource == TTarget still exists. The simplified helpers for reindex contained a bug, out integration tests only reindexed 2 documents but if you had more the backpressure sanity checks would through, these now run with sane defaults. * fix exception messages and ConsistentFluentParamNames tests * Simplify exception messages
1 parent ee0b100 commit 4a1dfa7

File tree

9 files changed

+314
-104
lines changed

9 files changed

+314
-104
lines changed

src/Nest/CommonAbstractions/Reactive/CoordinatedRequestObserverBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ internal static class CoordinatedRequestDefaults
88
public static TimeSpan BulkAllBackOffTimeDefault = TimeSpan.FromMinutes(1);
99
public static int BulkAllBackOffRetriesDefault = 0;
1010
public static int BulkAllSizeDefault = 1000;
11+
public static int ReindexBackPressureFactor = 4;
12+
public static int ReindexScrollSize = 500;
1113
}
1214

1315
public abstract class CoordinatedRequestObserverBase<T> : IObserver<T>

src/Nest/Document/Multiple/BulkIndexByScrollFailure.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Nest
44
{
5-
// ReindexOnServer and UpdateByQuery aggregate failures under a single failures property
5+
// TODO: ReindexOnServer and UpdateByQuery aggregate failures under a single failures property
66
// So the shape is a bit odd
77
// https://github.com/elastic/elasticsearch/issues/17539
88
// We could come up with abstractions and normalization here but we should fix this at the root for 5.0

src/Nest/Document/Multiple/Reindex/ElasticClient-Reindex.cs

Lines changed: 136 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,71 +9,174 @@ public partial interface IElasticClient
99
/// Helper method that allows you to reindex from one index into another using ScrollAll and BulkAll.
1010
/// </summary>
1111
/// <returns>An IObservable&lt;IReindexResponse&lt;T&gt;$gt; you can subscribe to to listen to the progress of the reindex process</returns>
12-
IObservable<IBulkAllResponse> Reindex<T>(
13-
Func<ReindexDescriptor<T>, IReindexRequest<T>> selector,
12+
IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
13+
Func<TSource, TTarget> mapper,
14+
Func<ReindexDescriptor<TSource,TTarget>, IReindexRequest<TSource,TTarget>> selector,
1415
CancellationToken cancellationToken = default(CancellationToken)
15-
) where T : class;
16+
)
17+
where TSource : class
18+
where TTarget : class;
19+
20+
/// <summary>
21+
/// Helper method that allows you to reindex from one index into another using ScrollAll and BulkAll.
22+
/// </summary>
23+
/// <returns>An IObservable&lt;IReindexResponse&lt;T&gt;$gt; you can subscribe to to listen to the progress of the reindex process</returns>
24+
IObservable<IBulkAllResponse> Reindex<TSource>(
25+
Func<ReindexDescriptor<TSource, TSource>, IReindexRequest<TSource, TSource>> selector,
26+
CancellationToken cancellationToken = default(CancellationToken)
27+
)
28+
where TSource : class;
29+
30+
/// <summary>
31+
/// Helper method that allows you to reindex from one index into another using ScrollAll and BulkAll.
32+
/// </summary>
33+
/// <param name="request">a request object to describe the reindex operation</param>
34+
/// <returns>An IObservable&lt;IReindexResponse&lt;T&gt;$gt; you can subscribe to to listen to the progress of the reindex process</returns>
35+
IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
36+
IReindexRequest<TSource,TTarget> request,
37+
CancellationToken cancellationToken = default(CancellationToken)
38+
)
39+
where TSource : class
40+
where TTarget : class;
41+
42+
/// <summary>
43+
/// Helper method that allows you to reindex from one index into another using ScrollAll and BulkAll.
44+
/// </summary>
45+
/// <param name="request">a request object to describe the reindex operation</param>
46+
/// <returns>An IObservable&lt;IReindexResponse&lt;T&gt;$gt; you can subscribe to to listen to the progress of the reindex process</returns>
47+
IObservable<IBulkAllResponse> Reindex<TSource>(
48+
IReindexRequest<TSource> request,
49+
CancellationToken cancellationToken = default(CancellationToken)
50+
)
51+
where TSource : class;
1652

1753
/// <summary>
1854
/// Simplified form for reindex which will cover 80% of its usecases. Allows you to index all documents of type T from <paramref name="fromIndex" /> to <paramref name="toIndex" />
19-
/// optionally limitting the documents found in <paramref name="fromIndex" /> by using <paramref name="selector"/>.
55+
/// optionally limiting the documents found in <paramref name="fromIndex" /> by using <paramref name="selector"/>.
2056
/// </summary>
2157
/// <param name="fromIndex">The source index, from which all types will be returned</param>
2258
/// <param name="toIndex">The target index, if it does not exist already will be created using the same settings of <paramref name="fromIndex"/></param>
23-
/// <param name="selector">an optional query limitting the documents found in <paramref name="fromIndex"/></param>
24-
IObservable<IBulkAllResponse> Reindex<T>(
59+
/// <param name="selector">an optional query limiting the documents found in <paramref name="fromIndex"/></param>
60+
IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
2561
IndexName fromIndex,
2662
IndexName toIndex,
27-
Func<QueryContainerDescriptor<T>, QueryContainer> selector = null,
63+
Func<TSource, TTarget> mapper,
64+
Func<QueryContainerDescriptor<TSource>, QueryContainer> selector = null,
2865
CancellationToken cancellationToken = default(CancellationToken)
29-
) where T : class;
66+
)
67+
where TSource : class
68+
where TTarget : class;
3069

3170
/// <summary>
32-
/// Helper method that allows you to reindex from one index into another using ScrollAll and BulkAll.
71+
/// Simplified form for reindex which will cover 80% of its use cases. Allows you to index all documents of type T from <paramref name="fromIndex" /> to <paramref name="toIndex" />
72+
/// optionally limiting the documents found in <paramref name="fromIndex" /> by using <paramref name="selector"/>.
3373
/// </summary>
34-
/// <param name="request">a request object to describe the reindex operation</param>
35-
/// <returns>An IObservable&lt;IReindexResponse&lt;T&gt;$gt; you can subscribe to to listen to the progress of the reindex process</returns>
36-
IObservable<IBulkAllResponse> Reindex<T>(
37-
IReindexRequest<T> request,
74+
/// <param name="fromIndex">The source index, from which all types will be returned</param>
75+
/// <param name="toIndex">The target index, if it does not exist already will be created using the same settings of <paramref name="fromIndex"/></param>
76+
/// <param name="selector">an optional query limiting the documents found in <paramref name="fromIndex"/></param>
77+
IObservable<IBulkAllResponse> Reindex<TSource>(
78+
IndexName fromIndex,
79+
IndexName toIndex,
80+
Func<QueryContainerDescriptor<TSource>, QueryContainer> selector = null,
3881
CancellationToken cancellationToken = default(CancellationToken)
39-
) where T : class;
82+
)
83+
where TSource : class;
84+
4085
}
4186

4287
public partial class ElasticClient
4388
{
4489
/// <inheritdoc />
45-
public IObservable<IBulkAllResponse> Reindex<T>(
46-
Func<ReindexDescriptor<T>, IReindexRequest<T>> selector,
90+
public IObservable<IBulkAllResponse> Reindex<TSource>(
91+
Func<ReindexDescriptor<TSource,TSource>, IReindexRequest<TSource,TSource>> selector,
4792
CancellationToken cancellationToken = default(CancellationToken)
48-
) where T : class =>
49-
this.Reindex<T>(selector.InvokeOrDefault(new ReindexDescriptor<T>()));
93+
)
94+
where TSource : class =>
95+
this.Reindex(selector.InvokeOrDefault(new ReindexDescriptor<TSource,TSource>(s=>s)));
5096

5197
/// <inheritdoc />
52-
public IObservable<IBulkAllResponse> Reindex<T>(
53-
IndexName fromIndex,
54-
IndexName toIndex,
55-
Func<QueryContainerDescriptor<T>, QueryContainer> selector = null,
98+
public IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
99+
Func<TSource, TTarget> mapper,
100+
Func<ReindexDescriptor<TSource,TTarget>, IReindexRequest<TSource,TTarget>> selector,
56101
CancellationToken cancellationToken = default(CancellationToken)
57-
) where T : class =>
58-
this.Reindex<T>(r => r
59-
.ScrollAll("1m", -1, search => search.Search(ss => ss.Index(fromIndex).Query(selector)))
60-
.BulkAll(b => b.Index(toIndex))
61-
, cancellationToken);
102+
)
103+
where TTarget : class
104+
where TSource : class =>
105+
this.Reindex(selector.InvokeOrDefault(new ReindexDescriptor<TSource,TTarget>(mapper)));
106+
107+
/// <inheritdoc />
108+
public IObservable<IBulkAllResponse> Reindex<TSource>(
109+
IReindexRequest<TSource> request,
110+
CancellationToken cancellationToken = default(CancellationToken)
111+
)
112+
where TSource : class =>
113+
this.Reindex<TSource, TSource>(request, cancellationToken);
62114

63115

64116
/// <inheritdoc />
65-
public IObservable<IBulkAllResponse> Reindex<T>(
66-
IReindexRequest<T> request,
117+
public IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
118+
IReindexRequest<TSource,TTarget> request,
67119
CancellationToken cancellationToken = default(CancellationToken)
68-
) where T : class
120+
)
121+
where TTarget : class
122+
where TSource : class
69123
{
70124
if (request.ScrollAll == null)
71125
throw new ArgumentException(
72-
"Reindex must have ScrollAll property set so we know how to get the source of the reindex operation");
126+
"ScrollAll property must be set in order to get the source of a Reindex operation");
73127
if (request.BulkAll == null)
74128
throw new ArgumentException(
75-
"Reindex must have BulkAll property set so we know how to get the target of the reindex operation");
76-
return new ReindexObservable<T>(this, ConnectionSettings, request, cancellationToken);
129+
"BulkAll property must set in order to get the target of a Reindex operation");
130+
return new ReindexObservable<TSource,TTarget>(this, ConnectionSettings, request, cancellationToken);
131+
}
132+
133+
/// <inheritdoc />
134+
public IObservable<IBulkAllResponse> Reindex<TSource,TTarget>(
135+
IndexName fromIndex,
136+
IndexName toIndex,
137+
Func<TSource, TTarget> mapper,
138+
Func<QueryContainerDescriptor<TSource>, QueryContainer> selector = null,
139+
CancellationToken cancellationToken = default(CancellationToken)
140+
)
141+
where TTarget : class
142+
where TSource : class =>
143+
this.Reindex(
144+
mapper,
145+
SimplifiedReindexer<TSource, TTarget>(fromIndex, toIndex, selector)
146+
, cancellationToken);
147+
148+
/// <inheritdoc />
149+
public IObservable<IBulkAllResponse> Reindex<TSource>(
150+
IndexName fromIndex,
151+
IndexName toIndex,
152+
Func<QueryContainerDescriptor<TSource>, QueryContainer> selector = null,
153+
CancellationToken cancellationToken = default(CancellationToken)
154+
)
155+
where TSource : class =>
156+
this.Reindex(
157+
s=>s,
158+
SimplifiedReindexer<TSource, TSource>(fromIndex, toIndex, selector)
159+
, cancellationToken);
160+
161+
private static Func<ReindexDescriptor<TSource,TTarget>, IReindexRequest<TSource,TTarget>> SimplifiedReindexer<TSource, TTarget>(
162+
IndexName fromIndex,
163+
IndexName toIndex,
164+
Func<QueryContainerDescriptor<TSource>, QueryContainer> selector
165+
)
166+
where TTarget : class
167+
where TSource : class
168+
{
169+
return r => r
170+
.ScrollAll("1m", -1, search => search
171+
.Search(ss => ss
172+
.Size(CoordinatedRequestDefaults.ReindexScrollSize)
173+
.Index(fromIndex)
174+
.Query(selector)
175+
)
176+
)
177+
.BulkAll(b => b.Index(toIndex));
77178
}
179+
180+
78181
}
79182
}

0 commit comments

Comments
 (0)