diff --git a/.travis.yml b/.travis.yml index 149cea9..64affc6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,5 +3,6 @@ services: - rabbitmq scala: - 2.11.8 + - 2.12.0 jdk: - oraclejdk8 diff --git a/README.md b/README.md index 381dfdb..4f6c6ad 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Source.fromPublisher(queue).map(_.message).runWith(Sink.fromSubscriber(exchange) API Docs ---- -Run `sbt doc` and open target/scala-2.11/index.html. +Run `sbt doc` and open target/scala-2.12/index.html. Settings ---- diff --git a/build.sbt b/build.sbt index 000cb4d..096682c 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,9 @@ licenses := Seq("Apache License 2.0" -> url("http://opensource.org/licenses/Apac homepage := Some(url("https://github.com/ScalaConsultants/reactive-rabbit")) -scalaVersion := "2.11.8" +scalaVersion := "2.12.0" + +crossScalaVersions := Seq("2.11.8", "2.12.0") scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings", "-target:jvm-1.8") @@ -25,7 +27,7 @@ libraryDependencies ++= Seq( "com.typesafe" % "config" % "1.3.0", // Configuration "com.google.guava" % "guava" % "19.0", // for MediaType "com.google.code.findbugs" % "jsr305" % "3.0.1", - "org.scalatest" %% "scalatest" % "2.2.6" % "test", // for TCK + "org.scalatest" %% "scalatest" % "3.0.1" % "test", // for TCK "org.reactivestreams" % "reactive-streams-tck" % "1.0.0" % "test", "com.typesafe.akka" %% "akka-stream" % "2.4.12" % "test" ) diff --git a/src/main/scala/io/scalac/amqp/ConnectionSettings.scala b/src/main/scala/io/scalac/amqp/ConnectionSettings.scala index dcdb855..2c2facb 100644 --- a/src/main/scala/io/scalac/amqp/ConnectionSettings.scala +++ b/src/main/scala/io/scalac/amqp/ConnectionSettings.scala @@ -43,8 +43,8 @@ object ConnectionSettings { /** Builds settings from TypeSafe Config. */ def apply(config: Config): ConnectionSettings = apply( addresses = { - import scala.collection.JavaConversions._ - config.getConfigList("amqp.addresses").map(address ⇒ + import scala.collection.JavaConverters._ + config.getConfigList("amqp.addresses").asScala.map(address ⇒ Address( host = address.getString("host"), port = address.getInt("port") diff --git a/src/main/scala/io/scalac/amqp/impl/Conversions.scala b/src/main/scala/io/scalac/amqp/impl/Conversions.scala index dd86815..8383d0a 100644 --- a/src/main/scala/io/scalac/amqp/impl/Conversions.scala +++ b/src/main/scala/io/scalac/amqp/impl/Conversions.scala @@ -1,6 +1,7 @@ package io.scalac.amqp.impl import java.time.{ZoneId, ZonedDateTime} +import java.util import java.util.Date import java.util.concurrent.TimeUnit @@ -9,7 +10,7 @@ import com.google.common.net.MediaType import com.rabbitmq.client.{AMQP, ConnectionFactory, Envelope} import io.scalac.amqp._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration @@ -62,7 +63,7 @@ private object Conversions { body = body, contentType = Option(properties.getContentType).map(MediaType.parse), contentEncoding = Option(properties.getContentEncoding), - headers = Option(properties.getHeaders).map(_.toMap.mapValues(_.toString)).getOrElse(Map()), + headers = Option(properties.getHeaders).map(_.asScala.toMap.mapValues(_.toString)).getOrElse(Map()), mode = toDeliveryMode(properties.getDeliveryMode), priority = Option(properties.getPriority).map(Integer2int), correlationId = Option(properties.getCorrelationId), @@ -98,7 +99,7 @@ private object Conversions { new AMQP.BasicProperties.Builder() .contentType(message.contentType.map(_.toString).orNull) .contentEncoding(message.contentEncoding.orNull) - .headers(message.headers) + .headers(message.headers.asJava.asInstanceOf[util.Map[String, AnyRef]]) .deliveryMode(toDeliveryMode(message.mode)) .priority(message.priority.map(int2Integer).orNull) .correlationId(message.correlationId.orNull) diff --git a/src/main/scala/io/scalac/amqp/impl/RabbitConnection.scala b/src/main/scala/io/scalac/amqp/impl/RabbitConnection.scala index adbc5e1..7293f50 100644 --- a/src/main/scala/io/scalac/amqp/impl/RabbitConnection.scala +++ b/src/main/scala/io/scalac/amqp/impl/RabbitConnection.scala @@ -1,12 +1,13 @@ package io.scalac.amqp.impl import java.io.IOException +import java.util import com.rabbitmq.client.{Address, AlreadyClosedException, Channel} import io.scalac.amqp._ import org.reactivestreams.{Subscriber, Subscription} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Future, blocking} @@ -54,7 +55,7 @@ private[amqp] class RabbitConnection(settings: ConnectionSettings) extends Conne override def exchangeBind(destination: String, source: String, routingKey: String, arguments: Map[String, String]) = - future(onChannel(_.exchangeBind(destination, source, routingKey, arguments))) + future(onChannel(_.exchangeBind(destination, source, routingKey, arguments.asJava.asInstanceOf[util.Map[String, AnyRef]]))) .map(_ ⇒ Exchange.BindOk()) override def exchangeUnbind(destination: String, source: String, routingKey: String) = @@ -101,7 +102,7 @@ private[amqp] class RabbitConnection(settings: ConnectionSettings) extends Conne override def queueBind(queue: String, exchange: String, routingKey: String, arguments: Map[String, String]) = - future(onChannel(_.queueBind(queue, exchange, routingKey, arguments))) + future(onChannel(_.queueBind(queue, exchange, routingKey, arguments.asJava.asInstanceOf[util.Map[String, AnyRef]]))) .map(_ ⇒ Queue.BindOk()) override def queueUnbind(queue: String, exchange: String, routingKey: String) =