Skip to content

(dsl): Support IP range aggregation #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
* text=auto eol=lf
sbt linguist-vendored
61 changes: 61 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_ip_range.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
id: elastic_aggregation_ip_range
title: "Ip Range Aggregation"
---

The `Ip Range` aggregation is a multi-bucket aggregation that creates buckets for ranges of IP addresses, either using `from`/`to` values or `CIDR` masks.

In order to use the `IP Range` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.IpRangeAggregation
import zio.elasticsearch.ElasticAggregation.ipRangeAggregation
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `IpRange` aggregation using the `IpRangeAggregation` method this way:
```scala
val aggregation: IpRangeAggregation =
ipRangeAggregation(
name = "ip_range_agg",
field = Document.stringField,
ranges = NonEmptyChunk(
IpRange.IpRangeBound(to = Some("10.0.0.5")),
IpRange.IpRangeBound(from = Some("10.0.0.5"))
)
)
```

You can create an `IpRange` aggregation using the `IpRangeAggregation` method this way:
```scala
val aggregation: IpRangeAggregation =
ipRangeAggregation(
name = "ip_range_agg",
field = "ipField",
ranges = NonEmptyChunk(
IpRange.IpRangeBound(to = Some("10.0.0.5")),
IpRange.IpRangeBound(from = Some("10.0.0.5"))
)
)
```

You can also use CIDR masks for ranges:
```scala
val cidrAggregation: IpRangeAggregation =
ipRangeAggregation(
name = "cidr_agg",
field = "ipField",
ranges = NonEmptyChunk(
IpRange.IpRangeBound(mask = Some("10.0.0.0/25")),
IpRange.IpRangeBound(mask = Some("10.0.0.128/25"))
)
)
```

If you want to explicitly set the keyed property:
```scala
val multipleAggregations =
ipRangeAggregation("ip_range_agg", "ipField", NonEmptyChunk(IpRange.IpRangeBound(to = Some("10.0.0.5"))))
.keyedOn
.withAgg(maxAggregation("maxAgg", "someField"))
```

You can find more information about `Ip Range` aggregation [here](https://www.elastic.co/docs/reference/aggregations/search-aggregations-bucket-iprange-aggregation).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file still have issues with format, and also it has special ^M characters

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.ElasticAggregation._
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery.{script => _, _}
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.aggregation.AggregationOrder
import zio.elasticsearch.aggregation.IpRange.IpRangeBound
import zio.elasticsearch.aggregation.{AggregationOrder, IpRange}
import zio.elasticsearch.data.GeoPoint
import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
Expand All @@ -41,6 +41,7 @@ import zio.stream.{Sink, ZSink}
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
import zio.{Chunk, NonEmptyChunk}

import java.time.LocalDate
import scala.util.Random
Expand Down Expand Up @@ -467,6 +468,78 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using IpRange aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
val updatedA = firstDocument.copy(stringField = "10.0.0.10")
val updatedB = secondDocument.copy(stringField = "10.0.0.200")

for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))

_ <-
Executor.execute(ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, updatedA))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, updatedB)
.refreshTrue
)

aggregation = IpRange(
name = "ip_ranges",
field = "ipField",
ranges = NonEmptyChunk(
IpRangeBound(to = Some("10.0.0.5")),
IpRangeBound(from = Some("10.0.0.5"))
),
keyed = None,
subAggregations = None
)

result <-
Executor
.execute(ElasticRequest.aggregate(firstSearchIndex, aggregation))
.aggregations
} yield assert(result)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using Ip range aggregation with CIDR masks") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) { (docId1, doc1, docId2, doc2) =>
val updated1 = doc1.copy(stringField = "10.0.0.10")
val updated2 = doc2.copy(stringField = "10.0.0.120")

for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(ElasticRequest.upsert[TestDocument](firstSearchIndex, docId1, updated1))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, docId2, updated2)
.refreshTrue
)

aggregation = IpRange(
name = "cidr_agg",
field = "ipField",
ranges = NonEmptyChunk(
IpRangeBound(mask = Some("10.0.0.0/25")),
IpRangeBound(mask = Some("10.0.0.128/25"))
),
keyed = None,
subAggregations = None
)

result <- Executor
.execute(ElasticRequest.aggregate(firstSearchIndex, aggregation))
.aggregations
} yield assert(result)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation with max aggregation as a sub aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down Expand Up @@ -1064,7 +1137,7 @@ object HttpExecutorSpec extends IntegrationSpec {
_ <- Executor.execute(ElasticRequest.bulk(req1, req2, req3).refreshTrue)
query = ElasticQuery.kNN(TestDocument.vectorField, 2, 3, Chunk(-5.0, 9.0, -12.0))
res <- Executor.execute(ElasticRequest.knnSearch(firstSearchIndex, query)).documentAs[TestDocument]
} yield (assert(res)(equalTo(Chunk(firstDocumentUpdated, thirdDocumentUpdated))))
} yield assert(res)(equalTo(Chunk(firstDocumentUpdated, thirdDocumentUpdated)))
}
} @@ around(
Executor.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.aggregation.IpRange.IpRangeBound
import zio.elasticsearch.aggregation._
import zio.elasticsearch.query.ElasticQuery
import zio.elasticsearch.script.Script
import zio.{Chunk, NonEmptyChunk}

object ElasticAggregation {

Expand Down Expand Up @@ -160,6 +161,45 @@ object ElasticAggregation {
final def filterAggregation(name: String, query: ElasticQuery[_]): FilterAggregation =
Filter(name = name, query = query, subAggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.IpRangeAggregation]] using the specified
* parameters.
*
* @param name
* Aggregation name.
* @param field
* The field for which the IP range aggregation will be executed
* @param ranges
* A chunk of IP range bounds specifying the ranges
* @param subAggregations
* Optional map of sub-aggregations to nest within this aggregation
* @return
* An instance of [[IpRangeAggregation]] that represents filter aggregation to be performed.
*/
def ipRangeAggregation(
name: String,
field: Field[_, String],
ranges: NonEmptyChunk[IpRangeBound]
): IpRangeAggregation =
IpRange(name = name, field = field.toString, ranges = ranges, keyed = None, subAggregations = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.IpRangeAggregation]] using the specified parameters.
*
* @param name
* Aggregation name
* @param field
* The field (as string) for which the IP range aggregation will be executed
* @param ranges
* A chunk of IP range bounds specifying the ranges
* @param subAggregations
* Optional map of sub-aggregations to nest within this aggregation
* @return
* An instance of [[IpRangeAggregation]] configured with the provided parameters.
*/
def ipRangeAggregation(name: String, field: String, ranges: NonEmptyChunk[IpRangeBound]): IpRangeAggregation =
IpRange(name = name, field = field, ranges = ranges, keyed = None, subAggregations = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

package zio.elasticsearch.aggregation

import zio.Chunk
import zio.elasticsearch.ElasticAggregation.multipleAggregations
import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps
import zio.elasticsearch.aggregation.IpRange.IpRangeBound
import zio.elasticsearch.aggregation.options._
import zio.elasticsearch.query.ElasticQuery
import zio.elasticsearch.query.sort.Sort
import zio.elasticsearch.script.Script
import zio.json.ast.Json
import zio.json.ast.Json.{Arr, Obj}
import zio.{Chunk, NonEmptyChunk}

sealed trait ElasticAggregation { self =>
private[elasticsearch] def toJson: Json
Expand Down Expand Up @@ -205,6 +206,71 @@ private[elasticsearch] final case class Filter(
}
}

sealed trait IpRangeAggregation extends SingleElasticAggregation with WithAgg with WithSubAgg[IpRangeAggregation]

private[elasticsearch] final case class IpRange(
name: String,
field: String,
ranges: NonEmptyChunk[IpRangeBound],
keyed: Option[Boolean],
subAggregations: Option[Chunk[SingleElasticAggregation]]
) extends IpRangeAggregation { self =>

def keyedOn: IpRangeAggregation = self.copy(keyed = Some(true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can use only keyed.


def withAgg(aggregation: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, aggregation)

def withSubAgg(aggregation: SingleElasticAggregation): IpRangeAggregation =
self.copy(subAggregations = Some(aggregation +: subAggregations.getOrElse(Chunk.empty)))

private[elasticsearch] def toJson: Json = {

val rangesJson = ranges.map(_.toJson)
val keyedJson = keyed.fold(Obj())(k => Obj("keyed" -> k.toJson))
val subAggsJson = subAggregations match {
case Some(aggs) if aggs.nonEmpty =>
Obj("aggs" -> aggs.map(_.toJson).reduce(_ merge _))
case _ => Obj()
}

Obj(
name -> (
Obj("ip_range" -> (Obj("field" -> field.toJson, "ranges" -> Arr(rangesJson)) merge keyedJson)) merge subAggsJson
)
)
}
}

object IpRange {

final case class IpRangeBound(
from: Option[String] = None,
to: Option[String] = None,
mask: Option[String] = None,
key: Option[String] = None
) { self =>

def from(value: String): IpRangeBound = self.copy(from = Some(value))

def to(value: String): IpRangeBound = self.copy(to = Some(value))

def mask(value: String): IpRangeBound = self.copy(mask = Some(value))

def key(value: String): IpRangeBound = self.copy(key = Some(value))

def toJson: Json = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put private[elasticsearch] here?

val baseFields = Chunk.empty[(String, Json)] ++
from.map("from" -> _.toJson) ++
to.map("to" -> _.toJson) ++
mask.map("mask" -> _.toJson) ++
key.map("key" -> _.toJson)

Obj(baseFields: _*)
}
}
}

sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg

private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ object AggregationResponse {
(key, toResult(response))
})
)
case IpRangeAggregationResponse(buckets) =>
IpRangeAggregationResult(
buckets.map(b =>
IpRangeAggregationBucketResult(
key = b.key,
from = b.from,
to = b.to,
docCount = b.docCount,
subAggregations = Map.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbulaja98, up there, we have withSubAggregations. Is this correct implementation then?

)
)
)
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
Expand Down Expand Up @@ -157,6 +169,8 @@ private[elasticsearch] case class BucketDecoder(fields: Chunk[(String, Json)]) e
)
case str if str.contains("filter#") =>
Some(field -> data.unsafeAs[FilterAggregationResponse](FilterAggregationResponse.decoder))
case str if str.contains("ip_range#") =>
Some(field -> data.unsafeAs[IpRangeAggregationResponse](IpRangeAggregationResponse.decoder))
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Expand Down Expand Up @@ -202,6 +216,8 @@ private[elasticsearch] case class BucketDecoder(fields: Chunk[(String, Json)]) e
(field.split("#")(1), data.asInstanceOf[ExtendedStatsAggregationResponse])
case str if str.contains("filter#") =>
(field.split("#")(1), data.asInstanceOf[FilterAggregationResponse])
case str if str.contains("ip_range#") =>
(field.split("#")(1), data.asInstanceOf[IpRangeAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
Expand Down Expand Up @@ -285,6 +301,27 @@ private[elasticsearch] sealed trait JsonDecoderOps {
}
}

private[elasticsearch] final case class IpRangeAggregationBucket(
key: String,
from: Option[String],
to: Option[String],
@jsonField("doc_count")
docCount: Int
) extends AggregationBucket

private[elasticsearch] object IpRangeAggregationBucket {
implicit val decoder: JsonDecoder[IpRangeAggregationBucket] = DeriveJsonDecoder.gen[IpRangeAggregationBucket]
}

private[elasticsearch] final case class IpRangeAggregationResponse(
buckets: Chunk[IpRangeAggregationBucket]
) extends AggregationResponse

private[elasticsearch] object IpRangeAggregationResponse {
implicit val decoder: JsonDecoder[IpRangeAggregationResponse] =
DeriveJsonDecoder.gen[IpRangeAggregationResponse]
}

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
ExtendedStatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("filter#") =>
FilterAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("ip_range#") =>
IpRangeAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
Expand Down
Loading