Skip to content

Finish MVP for backend search service implementation #2330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: feature/search-service
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agni/api/app/foxcomm/agni/api/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.twitter.finagle.http.Status
import com.twitter.util.Await
import foxcomm.agni._
import foxcomm.agni.dsl.query._
import foxcomm.agni.interpreter.es.queryInterpreter
import foxcomm.agni.interpreter.es._
import foxcomm.utils.finch._
import io.circe.generic.extras.auto._
import io.finch._
Expand All @@ -15,7 +15,7 @@ import org.elasticsearch.common.ValidationException

object Api extends App {
def endpoints(searchService: SearchService)(implicit s: Scheduler) =
post("api" :: "search" :: "translate" :: jsonBody[SearchPayload.fc]) { (searchPayload: SearchPayload.fc) ⇒
post("api" :: "search" :: "translate" :: jsonBody[SearchPayload]) { (searchPayload: SearchPayload) ⇒
searchService
.translate(searchPayload = searchPayload)
.map(Ok)
Expand Down Expand Up @@ -44,7 +44,7 @@ object Api extends App {

implicit val s: Scheduler = Scheduler.global
val config = AppConfig.load()
val svc = SearchService.fromConfig(config, queryInterpreter)
val svc = SearchService.fromConfig(config, dslInterpreter)

Await.result(
Http.server
Expand Down
47 changes: 14 additions & 33 deletions agni/core/app/foxcomm/agni/SearchService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,25 @@ import foxcomm.agni.interpreter.es._
import io.circe._
import io.circe.jawn.parseByteBuffer
import monix.eval.{Coeval, Task}
import org.elasticsearch.action.search.{SearchAction, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.action.search.{SearchAction, SearchRequest, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.xcontent.{ToXContent, XContentFactory}
import org.elasticsearch.index.query.QueryBuilder
import org.elasticsearch.search.SearchHit

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class SearchService private (client: Client, qi: ESQueryInterpreter) {
class SearchService private (client: Client, interpreter: ESSearchInterpreter) {
import SearchService.ExtractJsonObject

def translate(searchPayload: SearchPayload.fc): Task[Json] = {
def buildJson(qb: QueryBuilder): Coeval[Json] =
Coeval.eval {
val builder = XContentFactory.jsonBuilder()
builder.prettyPrint()
builder.startObject()
builder.field("query")
qb.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.endObject()
parseByteBuffer(builder.bytes().toChannelBuffer.toByteBuffer)
.fold(Coeval.raiseError(_), Coeval.eval(_))
}.flatten
def translate(searchPayload: SearchPayload): Task[Json] = {
def buildJson(req: SearchRequest): Coeval[Json] =
parseByteBuffer(req.source().toChannelBuffer.toByteBuffer)
.fold(Coeval.raiseError(_), Coeval.eval(_))

for {
builder ← qi(searchPayload.query).task
json ← buildJson(builder).task
req ← interpreter(searchPayload → new SearchRequestBuilder(client, SearchAction.INSTANCE)).task
json ← buildJson(req).task
} yield json
}

Expand All @@ -49,22 +39,13 @@ class SearchService private (client: Client, qi: ESQueryInterpreter) {
.setTypes(searchType)
.setSize(searchSize)
searchFrom.foreach(builder.setFrom)
searchPayload.fields.foreach(fs ⇒ builder.setFetchSource(fs.toList.toArray, Array.empty[String]))
builder
}

def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchPayload match {
case SearchPayload.es(query, _) ⇒
Coeval.eval(builder.setQuery(Json.fromJsonObject(query).toBytes))
case SearchPayload.fc(query, _) ⇒
qi(query).map(builder.setQuery)
}

def setupBuilder: Task[SearchRequestBuilder] = (prepareBuilder flatMap evalQuery).task
def searchRequest: Task[SearchRequest] = prepareBuilder.flatMap(b ⇒ interpreter(searchPayload → b)).task

for {
builder ← setupBuilder
request = builder.request()
request ← searchRequest
response ← async[SearchResponse, SearchResult](client.search(request, _))
} yield {
val hits = response.getHits
Expand All @@ -90,10 +71,10 @@ object SearchService {
.flatMap(_.asObject)
}

def apply(client: Client, qi: ESQueryInterpreter): SearchService =
new SearchService(client, qi)
def apply(client: Client, interpreter: ESSearchInterpreter): SearchService =
new SearchService(client, interpreter)

def fromConfig(config: AppConfig, qi: ESQueryInterpreter): SearchService = {
def fromConfig(config: AppConfig, interpreter: ESSearchInterpreter): SearchService = {
val esConfig = config.elasticsearch
val settings =
Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build()
Expand All @@ -103,6 +84,6 @@ object SearchService {
.build()
.addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*)

apply(client, qi)
apply(client, interpreter)
}
}
57 changes: 57 additions & 0 deletions agni/core/app/foxcomm/agni/dsl/aggregations.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package foxcomm.agni.dsl

import cats.data.NonEmptyList
import cats.implicits._
import io.circe._
import io.circe.generic.extras.auto._

object aggregations {
sealed trait AggregationFunction {
def name: String

def `type`: String

final def tpe: String = `type`
}
object AggregationFunction {
sealed trait WithMetadata { this: AggregationFunction ⇒
def meta: Option[JsonObject]
}

final case class raw private (name: String, `type`: String, meta: Option[JsonObject], value: JsonObject)
extends AggregationFunction
with WithMetadata

implicit val decodeAggFunction: Decoder[AggregationFunction] = {
val all: Map[String, Decoder[_ <: AggregationFunction]] = Map.empty.withDefaultValue(Decoder[raw])

Decoder.instance { hc ⇒
val c = hc.downField(Discriminator)
val tpe = c.focus.flatMap(_.asString)
tpe match {
case Some(t) ⇒ all(t).tryDecode(hc)
case None ⇒ Either.left(DecodingFailure("Unknown aggregation function type", c.history))
}
}
}
}

final case class FCAggregation(aggs: Option[NonEmptyList[AggregationFunction]])
object FCAggregation {
implicit val decodeFCAggregation: Decoder[FCAggregation] = {
Decoder
.decodeOption(Decoder.decodeNonEmptyList[AggregationFunction].emap { aggs ⇒
aggs
.foldM[Either[String, ?], Set[String]](Set.empty[String])(
(defined, agg) ⇒
if (defined.contains(agg.name))
Either.left(
s"Cannot have multiple aggregations with the same name: ${agg.name} is defined twice")
else Either.right(defined))
.right
.map(_ ⇒ aggs)
} or Decoder[AggregationFunction].map(NonEmptyList.of(_)))
.map(FCAggregation(_))
}
}
}
11 changes: 11 additions & 0 deletions agni/core/app/foxcomm/agni/dsl/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package foxcomm.agni

import io.circe.Decoder
import io.circe.generic.extras.Configuration
import shapeless._

package object dsl {
val Discriminator: String = foxcomm.agni.Discriminator

implicit def configuration: Configuration = foxcomm.agni.configuration

/** Decodes coproduct assuming that json representations of each coproduct element are disjoint. */
implicit def decodeCoproduct[H: Decoder, T <: Coproduct: Decoder]: Decoder[H :+: T] =
Decoder[H].map(Inl(_)) or Decoder[T].map(Inr(_))

implicit def decodeCoproductLeaf[L: Decoder]: Decoder[L :+: CNil] =
Decoder[L].map(Inl(_))
}
Loading