Skip to content

Commit 357cac9

Browse files
authored
Finally tagless API (#12)
Finally tagless API
1 parent 573e32e commit 357cac9

File tree

13 files changed

+100
-65
lines changed

13 files changed

+100
-65
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ method is obviously `POST`. The _info_ endpoint is supposed to be exposed on the
3030
compile 'com.avast.grpc:grpc-json-bridge-core_2.12:x.x.x'
3131
```
3232

33-
#### Gradle
33+
#### SBT
3434
```scala
3535
libraryDependencies += "com.avast.grpc" %% "grpc-json-bridge-core" % "x.x.x"
3636
```
@@ -72,7 +72,7 @@ val bridge = new TestApiServiceImplBase {
7272
responseObserver.onNext(GetResponse.newBuilder().putResults("name", 42).build())
7373
responseObserver.onCompleted()
7474
}
75-
}.createGrpcJsonBridge[TestApiServiceFutureStub]() // this does the magic!
75+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() // this does the magic!
7676
```
7777
or you can even go with the [Cactus](https://github.com/avast/cactus) and let it map the GPB messages to your case classes:
7878
```scala
@@ -107,7 +107,7 @@ val service = new MyApi {
107107
}
108108
}.mappedToService[TestApiServiceImplBase]() // cactus mapping
109109

110-
val bridge = service.createGrpcJsonBridge[TestApiServiceFutureStub]()
110+
val bridge = service.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
111111
```
112112

113113
### Calling the bridged service

akka-http/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ The [gRPC Json Bridge](../README.md) integration with [Akka Http](https://doc.ak
99
compile 'com.avast.grpc:grpc-json-bridge-akkahttp_2.12:x.x.x'
1010
```
1111

12-
### Gradle
12+
### SBT
1313
```scala
1414
libraryDependencies += "com.avast.grpc" %% "grpc-json-bridge-akkahttp" % "x.x.x"
1515
```
@@ -22,7 +22,9 @@ import com.avast.grpc.jsonbridge.GrpcJsonBridge
2222
import com.avast.grpc.jsonbridge.akkahttp.{AkkaHttp, Configuration}
2323
import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.TestApiServiceImplBase
2424

25-
val bridge: GrpcJsonBridge[TestApiServiceImplBase] = ??? // see core module docs for info about creating the bridge
25+
implicit val scheduler: monix.execution.Scheduler = ???
26+
27+
val bridge: GrpcJsonBridge[Task, TestApiServiceImplBase] = ??? // see core module docs for info about creating the bridge
2628

2729
val route: Route = AkkaHttp(Configuration.Default)(bridge)
2830
```

akka-http/src/main/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttp.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import akka.http.scaladsl.model.headers.`Content-Type`
55
import akka.http.scaladsl.server.Directives._
66
import akka.http.scaladsl.server.{PathMatcher, Route}
77
import cats.data.NonEmptyList
8-
import com.avast.grpc.jsonbridge.GrpcJsonBridge
98
import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader
9+
import com.avast.grpc.jsonbridge.{GrpcJsonBridge, ToTask}
1010
import io.grpc.BindableService
1111
import io.grpc.Status.Code
12+
import monix.execution.Scheduler
1213

14+
import scala.language.higherKinds
1315
import scala.util.control.NonFatal
1416
import scala.util.{Failure, Success}
1517

@@ -19,8 +21,9 @@ object AkkaHttp {
1921
ContentType.WithMissingCharset(MediaType.applicationWithOpenCharset("json"))
2022
}
2123

22-
def apply(configuration: Configuration)(bridges: GrpcJsonBridge[_ <: BindableService]*): Route = {
23-
val services = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[_])).toMap
24+
def apply[F[_]: ToTask](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)(
25+
implicit sch: Scheduler): Route = {
26+
val services = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap
2427

2528
val pathPattern = configuration.pathPrefix
2629
.map {
@@ -42,7 +45,11 @@ object AkkaHttp {
4245
services.get(serviceName) match {
4346
case Some(service) =>
4447
entity(as[String]) { json =>
45-
onComplete(service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers))) {
48+
val methodCall = implicitly[ToTask[F]].apply {
49+
service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers))
50+
}.runAsync
51+
52+
onComplete(methodCall) {
4653
case Success(Right(r)) =>
4754
respondWithHeader(JsonContentType) {
4855
complete(r)

akka-http/src/test/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttpTest.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import com.avast.grpc.jsonbridge.test.TestApi.{GetRequest, GetResponse}
1111
import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.{TestApiServiceFutureStub, TestApiServiceImplBase}
1212
import io.grpc._
1313
import io.grpc.stub.StreamObserver
14+
import monix.eval.Task
1415
import org.scalatest.FunSuite
1516

1617
import scala.collection.JavaConverters._
@@ -37,9 +38,9 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
3738
responseObserver.onNext(GetResponse.newBuilder().putResults("name", 42).build())
3839
responseObserver.onCompleted()
3940
}
40-
}.createGrpcJsonBridge[TestApiServiceFutureStub]()
41+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
4142

42-
val route = AkkaHttp(Configuration.Default)(bridge)
43+
val route = AkkaHttp[Task](Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
4344

4445
Post(s"/${classOf[TestApiServiceImplBase].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
4546
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
@@ -62,11 +63,11 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
6263
responseObserver.onNext(GetResponse.newBuilder().putResults("name", 42).build())
6364
responseObserver.onCompleted()
6465
}
65-
}.createGrpcJsonBridge[TestApiServiceFutureStub]()
66+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
6667

6768
val configuration = Configuration.Default.copy(pathPrefix = Some(NonEmptyList.of("abc", "def")))
6869

69-
val route = AkkaHttp(configuration)(bridge)
70+
val route = AkkaHttp(configuration)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
7071

7172
Post(s"/abc/def/${classOf[TestApiServiceImplBase].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
7273
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
@@ -87,9 +88,9 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
8788
responseObserver.onNext(GetResponse.newBuilder().putResults("name", 42).build())
8889
responseObserver.onCompleted()
8990
}
90-
}.createGrpcJsonBridge[TestApiServiceFutureStub]()
91+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
9192

92-
val route = AkkaHttp(Configuration.Default)(bridge)
93+
val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
9394

9495
// empty body
9596
Post(s"/${classOf[TestApiServiceImplBase].getName.replace("$", ".")}/Get", "")
@@ -108,9 +109,9 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
108109
override def get(request: GetRequest, responseObserver: StreamObserver[TestApi.GetResponse]): Unit = {
109110
responseObserver.onError(new StatusException(Status.PERMISSION_DENIED))
110111
}
111-
}.createGrpcJsonBridge[TestApiServiceFutureStub]()
112+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
112113

113-
val route = AkkaHttp(Configuration.Default)(bridge)
114+
val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
114115

115116
Post(s"/${classOf[TestApiServiceImplBase].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
116117
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
@@ -119,9 +120,9 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
119120
}
120121

121122
test("provides service description") {
122-
val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[TestApiServiceFutureStub]()
123+
val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
123124

124-
val route = AkkaHttp(Configuration.Default)(bridge)
125+
val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
125126

126127
Get(s"/${classOf[TestApiServiceImplBase].getName.replace("$", ".")}") ~> route ~> check {
127128
assertResult(StatusCodes.OK)(status)
@@ -143,7 +144,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
143144
responseObserver.onNext(GetResponse.newBuilder().putResults("name", 42).build())
144145
responseObserver.onCompleted()
145146
}
146-
}.createGrpcJsonBridge[TestApiServiceFutureStub](
147+
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub](
147148
new ServerInterceptor {
148149
override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
149150
headers: Metadata,
@@ -154,7 +155,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
154155
}
155156
)
156157

157-
val route = AkkaHttp(Configuration.Default)(bridge)
158+
val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
158159

159160
val Ok(customHeaderToBeSent, _) = HttpHeader.parse("The-Header", headerValue)
160161

build.sbt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ lazy val core = (project in file("core")).settings(
111111
"com.google.protobuf" % "protobuf-java-util" % Versions.gpb3Version,
112112
"io.grpc" % "grpc-protobuf" % Versions.grpcVersion,
113113
"io.grpc" % "grpc-stub" % Versions.grpcVersion,
114-
"org.typelevel" %% "cats-core" % "1.0.1",
115-
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
114+
"org.typelevel" %% "cats-core" % "1.1.0",
115+
"io.monix" % "monix_2.12" % "3.0.0-RC1",
116+
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
116117
"org.slf4j" % "jul-to-slf4j" % "1.7.25",
117118
"org.slf4j" % "jcl-over-slf4j" % "1.7.25",
118119
"io.grpc" % "grpc-services" % Versions.grpcVersion % "test",

core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridge.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ package com.avast.grpc.jsonbridge
33
import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader
44
import io.grpc.{BindableService, Status}
55

6-
import scala.concurrent.Future
6+
import scala.language.higherKinds
77

8-
trait GrpcJsonBridge[Service <: BindableService] extends AutoCloseable {
8+
trait GrpcJsonBridge[F[_], Service <: BindableService] extends AutoCloseable {
99

1010
/** Invokes method with given name, if it exists. The method should never return a failed Future.
1111
*
1212
* @param name Name of the method.
1313
* @param json Method input (JSON string).
1414
* @return Left if there was some error, Right otherwise.
1515
*/
16-
def invokeGrpcMethod(name: String, json: => String, headers: => Seq[GrpcHeader] = Seq.empty): Future[Either[Status, String]]
16+
def invokeGrpcMethod(name: String, json: => String, headers: => Seq[GrpcHeader] = Seq.empty): F[Either[Status, String]]
1717

1818
/** Returns sequence of method names supported by this `GrpcJsonBridge`.
1919
*/

core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeBase.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.google.protobuf.util.JsonFormat
66
import com.typesafe.scalalogging.StrictLogging
77
import io.grpc.stub.MetadataUtils
88
import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException}
9+
import monix.eval.Task
910

1011
import scala.concurrent.{ExecutionContext, Future}
1112
import scala.util.control.NonFatal
@@ -18,17 +19,18 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict
1819

1920
// https://groups.google.com/forum/#!topic/grpc-io/1-KMubq1tuc
2021
protected def withNewClientStub[A](headers: Seq[GrpcHeader])(f: Stub => Future[A])(
21-
implicit ec: ExecutionContext): Future[Either[Status, A]] = {
22+
implicit ec: ExecutionContext): Task[Either[Status, A]] = {
2223
val metadata = new Metadata()
2324
headers.foreach(h => metadata.put(Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER), h.value))
2425

2526
val clientFutureStub = newFutureStub
2627
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
2728

2829
try {
29-
f(clientFutureStub)
30+
Task
31+
.deferFuture(f(clientFutureStub))
3032
.map(Right(_))
31-
.recover {
33+
.onErrorRecover {
3234
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL)
3335
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL)
3436
case e: StatusException => Left(e.getStatus)
@@ -38,11 +40,11 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict
3840
Left(Status.INTERNAL.withCause(e))
3941
}
4042
} catch {
41-
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Future.successful(Left(Status.INTERNAL))
42-
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Future.successful(Left(Status.INTERNAL))
43+
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL))
44+
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL))
4345
case NonFatal(e) =>
4446
logger.debug("Error while executing the request", e)
45-
Future.successful(Left(Status.INTERNAL.withCause(e)))
47+
Task.now(Left(Status.INTERNAL.withCause(e)))
4648
}
4749

4850
// just abandon the stub...

core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,33 @@ import com.google.protobuf.MessageLite
66
import io.grpc.BindableService
77
import io.grpc.stub.AbstractStub
88

9+
import scala.language.higherKinds
910
import scala.reflect.ClassTag
1011
import scala.reflect.macros.blackbox
1112

1213
class Macros(val c: blackbox.Context) {
1314

1415
import c.universe._
1516

16-
def generateGrpcJsonBridge[GrpcServiceStub <: BindableService, GrpcClientStub <: AbstractStub[GrpcClientStub]: WeakTypeTag](
17-
interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree): c.Expr[GrpcJsonBridge[GrpcServiceStub]] = {
17+
def generateGrpcJsonBridge[F[_], GrpcServiceStub <: BindableService, GrpcClientStub <: AbstractStub[GrpcClientStub]: WeakTypeTag](
18+
interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree, ct2: c.Tree): c.Expr[GrpcJsonBridge[F, GrpcServiceStub]] = {
1819

1920
val clientType = weakTypeOf[GrpcClientStub]
2021
val serviceTypeRaw = extractSymbolFromClassTag(ct)
2122
val serviceType = handleCactusType(serviceTypeRaw)
23+
val fType = extractSymbolFromClassTag(ct2).typeSymbol
2224

2325
val channelName = UUID.randomUUID().toString
2426

2527
val stub = {
2628
q" ${clientType.typeSymbol.owner}.newFutureStub(clientsChannel) "
2729
}
2830

29-
val methodCases = getMethodCases(serviceType)
31+
val methodCases = getMethodCases(fType, serviceType)
3032

3133
val t =
3234
q"""
33-
new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$serviceTypeRaw] with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$clientType] {
35+
new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$fType, $serviceTypeRaw] with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$clientType] {
3436
import _root_.com.avast.grpc.jsonbridge._
3537
import _root_.cats.instances.future._
3638
import _root_.cats.data._
@@ -46,16 +48,18 @@ class Macros(val c: blackbox.Context) {
4648

4749
override def invokeGrpcMethod(name: String,
4850
json: => String,
49-
headers: => _root_.scala.Seq[com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader]): scala.concurrent.Future[_root_.scala.Either[_root_.io.grpc.Status, String]] = {
50-
try {
51+
headers: => _root_.scala.Seq[com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader]): $fType[_root_.scala.Either[_root_.io.grpc.Status, String]] = {
52+
val task = try {
5153
name match {
5254
case ..$methodCases
5355
// unsupported method
54-
case _ => scala.concurrent.Future.successful(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND))
56+
case _ => monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND))
5557
}
5658
} catch {
57-
case _root_.scala.util.control.NonFatal(e) => _root_.scala.concurrent.Future.successful(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL))
59+
case _root_.scala.util.control.NonFatal(e) => _root_.monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL))
5860
}
61+
62+
implicitly[_root_.cats.arrow.FunctionK[Task, $fType]].apply(task)
5963
}
6064

6165
override val serviceInfo: _root_.scala.Seq[String] = ${serviceInfo(serviceType)}
@@ -70,10 +74,10 @@ class Macros(val c: blackbox.Context) {
7074
}
7175
"""
7276

73-
c.Expr[GrpcJsonBridge[GrpcServiceStub]](t)
77+
c.Expr[GrpcJsonBridge[F, GrpcServiceStub]](t)
7478
}
7579

76-
private def getMethodCases(serviceType: c.Type): Iterable[c.Tree] = {
80+
private def getMethodCases(fType: Symbol, serviceType: Type): Iterable[c.Tree] = {
7781
serviceType.members
7882
.collect {
7983
case ApiMethod(m) => m
@@ -83,7 +87,7 @@ class Macros(val c: blackbox.Context) {
8387
cq"""
8488
${firstUpper(name.toString)} =>
8589
(for {
86-
request <- _root_.cats.data.EitherT.fromEither[_root_.scala.concurrent.Future](fromJson(${request.companion}.getDefaultInstance, json))
90+
request <- _root_.cats.data.EitherT.fromEither[_root_.monix.eval.Task](fromJson(${request.companion}.getDefaultInstance, json))
8791
result <- _root_.cats.data.EitherT {
8892
withNewClientStub(headers) { _.$name(request).asScala(executor).map(toJson(_)) }
8993
}

core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,30 @@ package com.avast.grpc
22

33
import java.util.concurrent.Executor
44

5+
import cats.arrow.FunctionK
56
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
67
import io.grpc.stub.AbstractStub
78
import io.grpc.{BindableService, ServerInterceptor}
9+
import monix.eval.Task
810

911
import scala.concurrent.{ExecutionContext, Future, Promise}
1012
import scala.language.experimental.macros
13+
import scala.language.higherKinds
1114
import scala.reflect.ClassTag
1215

1316
package object jsonbridge {
1417

18+
type ToTask[A[_]] = FunctionK[A, Task]
19+
20+
implicit val fkTaskIdentity: FunctionK[Task, Task] = FunctionK.id
21+
1522
implicit class DeriveBridge[GrpcServiceStub <: BindableService](val serviceStub: GrpcServiceStub) extends AnyVal {
16-
def createGrpcJsonBridge[GrpcClientStub <: AbstractStub[GrpcClientStub]](interceptors: ServerInterceptor*)(
23+
def createGrpcJsonBridge[F[_], GrpcClientStub <: AbstractStub[GrpcClientStub]](interceptors: ServerInterceptor*)(
1724
implicit ec: ExecutionContext,
1825
ex: Executor,
19-
ct: ClassTag[GrpcServiceStub]): GrpcJsonBridge[GrpcServiceStub] =
20-
macro Macros.generateGrpcJsonBridge[GrpcServiceStub, GrpcClientStub]
26+
ct: ClassTag[GrpcServiceStub],
27+
ct2: ClassTag[F[_]]): GrpcJsonBridge[F, GrpcServiceStub] =
28+
macro Macros.generateGrpcJsonBridge[F, GrpcServiceStub, GrpcClientStub]
2129
}
2230

2331
implicit class ListenableFuture2ScalaFuture[T](val f: ListenableFuture[T]) extends AnyVal {

0 commit comments

Comments
 (0)