From 71ca6ed0079f9bc780d1d42874badc2fdf063fe7 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Tue, 29 Mar 2016 16:08:37 +0300 Subject: [PATCH 1/5] MK-2 - multiple cluster support --- kafka-mesos.properties | 4 +- src/scala/ly/stealth/mesos/kafka/Broker.scala | 25 ++- src/scala/ly/stealth/mesos/kafka/Cli.scala | 62 ++++--- .../ly/stealth/mesos/kafka/Cluster.scala | 166 +++-------------- .../ly/stealth/mesos/kafka/ClusterCli.scala | 156 ++++++++++++++++ src/scala/ly/stealth/mesos/kafka/Config.scala | 4 +- .../ly/stealth/mesos/kafka/Executor.scala | 2 +- src/scala/ly/stealth/mesos/kafka/Expr.scala | 18 +- .../ly/stealth/mesos/kafka/HttpServer.scala | 171 ++++++++++++++---- src/scala/ly/stealth/mesos/kafka/Nodes.scala | 105 +++++++++++ .../ly/stealth/mesos/kafka/Rebalancer.scala | 4 +- .../ly/stealth/mesos/kafka/Scheduler.scala | 33 ++-- .../ly/stealth/mesos/kafka/Storage.scala | 69 +++++++ src/scala/ly/stealth/mesos/kafka/Topics.scala | 4 +- .../ly/stealth/mesos/kafka/BrokerTest.scala | 10 +- src/test/ly/stealth/mesos/kafka/CliTest.scala | 120 ++++++++---- .../ly/stealth/mesos/kafka/ClusterTest.scala | 56 +++--- .../ly/stealth/mesos/kafka/ExprTest.scala | 87 +++++---- .../stealth/mesos/kafka/HttpServerTest.scala | 125 ++++++++----- .../stealth/mesos/kafka/MesosTestCase.scala | 27 +-- .../stealth/mesos/kafka/RebalancerTest.scala | 24 ++- .../stealth/mesos/kafka/SchedulerTest.scala | 34 ++-- .../ly/stealth/mesos/kafka/TopicsTest.scala | 4 +- 23 files changed, 865 insertions(+), 445 deletions(-) create mode 100644 src/scala/ly/stealth/mesos/kafka/ClusterCli.scala create mode 100644 src/scala/ly/stealth/mesos/kafka/Nodes.scala create mode 100644 src/scala/ly/stealth/mesos/kafka/Storage.scala diff --git a/kafka-mesos.properties b/kafka-mesos.properties index d71b826..7e47542 100644 --- a/kafka-mesos.properties +++ b/kafka-mesos.properties @@ -3,12 +3,10 @@ debug=true user=vagrant -storage=zk:/mesos-kafka-scheduler +storage=zk://master:2181/chroot/mesos-kafka-scheduler master=master:5050 -zk=master:2181/chroot - #for testing on the vagrant master via ./kafka-mesos.sh scheduler #you will eventually want to run this on a scheduler i.e marathon #change the IP to what is service discoverable & routable for your setup diff --git a/src/scala/ly/stealth/mesos/kafka/Broker.scala b/src/scala/ly/stealth/mesos/kafka/Broker.scala index bf6510f..a40c915 100644 --- a/src/scala/ly/stealth/mesos/kafka/Broker.scala +++ b/src/scala/ly/stealth/mesos/kafka/Broker.scala @@ -23,18 +23,17 @@ import org.apache.mesos.Protos.Resource.DiskInfo.Persistence import org.apache.mesos.Protos.Volume.Mode import scala.collection.JavaConversions._ -import scala.collection import org.apache.mesos.Protos.{Volume, Value, Resource, Offer} -import java.util._ +import java.util.{TimeZone, Collections, UUID, Date} import ly.stealth.mesos.kafka.Broker.{Metrics, Stickiness, Failover} import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range, Str} import java.text.SimpleDateFormat -import scala.List -import scala.collection.Map import scala.util.parsing.json.JSONObject class Broker(_id: String = "0") { var id: String = _id + var cluster: Cluster = null + @volatile var active: Boolean = false var cpus: Double = 1 @@ -57,6 +56,16 @@ class Broker(_id: String = "0") { // broker has been modified while being in non stopped state, once stopped or before task launch becomes false var needsRestart: Boolean = false + def this(id: String, cluster: Cluster){ + this(id) + this.cluster = cluster + } + + def this(json: Map[String, Any], expanded: Boolean = false) = { + this + fromJson(json, expanded) + } + def options(defaults: util.Map[String, String] = null): util.Map[String, String] = { val result = new util.LinkedHashMap[String, String]() if (defaults != null) result.putAll(defaults) @@ -243,8 +252,9 @@ class Broker(_id: String = "0") { matches } - def fromJson(node: Map[String, Object]): Unit = { + def fromJson(node: Map[String, Any], expanded: Boolean = false): Unit = { id = node("id").asInstanceOf[String] + cluster = if (expanded) new Cluster(node("cluster").asInstanceOf[Map[String, Any]]) else Nodes.getCluster(node("cluster").asInstanceOf[String]) active = node("active").asInstanceOf[Boolean] cpus = node("cpus").asInstanceOf[Number].doubleValue() @@ -276,9 +286,10 @@ class Broker(_id: String = "0") { if (node.contains("needsRestart")) needsRestart = node("needsRestart").asInstanceOf[Boolean] } - def toJson: JSONObject = { + def toJson(expanded: Boolean = false): JSONObject = { val obj = new collection.mutable.LinkedHashMap[String, Any]() obj("id") = id + obj("cluster") = if (expanded) cluster.toJson else cluster.id obj("active") = active obj("cpus") = cpus @@ -315,7 +326,7 @@ object Broker { def idFromExecutorId(executorId: String): String = idFromTaskId(executorId) - def isOptionOverridable(name: String): Boolean = !List("broker.id", "port", "zookeeper.connect").contains(name) + def isOptionOverridable(name: String): Boolean = !scala.List("broker.id", "port", "zookeeper.connect").contains(name) class Stickiness(_period: Period = new Period("10m")) { var period: Period = _period diff --git a/src/scala/ly/stealth/mesos/kafka/Cli.scala b/src/scala/ly/stealth/mesos/kafka/Cli.scala index d297096..46f5d19 100644 --- a/src/scala/ly/stealth/mesos/kafka/Cli.scala +++ b/src/scala/ly/stealth/mesos/kafka/Cli.scala @@ -67,6 +67,7 @@ object Cli { cmd match { case "topic" => TopicCli.handle(subCmd, args) case "broker" => BrokerCli.handle(subCmd, args) + case "cluster" => ClusterCli.handle(subCmd, args) case _ => throw new Error("unsupported command " + cmd) } } @@ -132,13 +133,13 @@ object Cli { Util.formatMap(map) } - private def newParser(): OptionParser = { + private[kafka] def newParser(): OptionParser = { val parser: OptionParser = new OptionParser() parser.formatHelpWith(new BuiltinHelpFormatter(Util.terminalWidth, 2)) parser } - private def printCmds(): Unit = { + private[kafka] def printCmds(): Unit = { printLine("Commands:") printLine("help [cmd [cmd]] - print general or command-specific help", 1) if (SchedulerCli.isEnabled) printLine("scheduler - start scheduler", 1) @@ -146,7 +147,7 @@ object Cli { printLine("topic - topic management commands", 1) } - private def printLine(s: Object = "", indent: Int = 0): Unit = out.println(" " * indent + s) + private[kafka] def printLine(s: Object = "", indent: Int = 0): Unit = out.println(" " * indent + s) private[kafka] def resolveApi(apiOption: String): Unit = { if (api != null) return @@ -231,7 +232,7 @@ object Cli { parser.accepts("storage", """Storage for cluster state. Examples: | - file:kafka-mesos.json - | - zk:/kafka-mesos + | - zk://master:2181/kafka-mesos |Default - """.stripMargin + Config.storage) .withRequiredArg().ofType(classOf[String]) @@ -264,19 +265,12 @@ object Cli { parser.accepts("framework-timeout", "Framework timeout (30s, 1m, 1h). Default - " + Config.frameworkTimeout) .withRequiredArg().ofType(classOf[String]) - parser.accepts("api", "Api url. Example: http://master:7000") .withRequiredArg().ofType(classOf[String]) parser.accepts("bind-address", "Scheduler bind address (master, 0.0.0.0, 192.168.50.*, if:eth1). Default - all") .withRequiredArg().ofType(classOf[String]) - parser.accepts("zk", - """Kafka zookeeper.connect. Examples: - | - master:2181 - | - master:2181,master2:2181""".stripMargin) - .withRequiredArg().ofType(classOf[String]) - parser.accepts("jre", "JRE zip-file (jre-7-openjdk.zip). Default - none.") .withRequiredArg().ofType(classOf[String]) @@ -354,10 +348,6 @@ object Cli { try { Config.bindAddress = new BindAddress(bindAddress) } catch { case e: IllegalArgumentException => throw new Error("Invalid bind-address") } - val zk = options.valueOf("zk").asInstanceOf[String] - if (zk != null) Config.zk = zk - else if (Config.zk == null) throw new Error(s"Undefined zk. $provideOption") - val jre = options.valueOf("jre").asInstanceOf[String] if (jre != null) Config.jre = new File(jre) if (Config.jre != null && !Config.jre.exists()) throw new Error("JRE file doesn't exists") @@ -450,7 +440,7 @@ object Cli { for (brokerNode <- brokerNodes) { val broker = new Broker() - broker.fromJson(brokerNode) + broker.fromJson(brokerNode, expanded = true) printBroker(broker, 1) printLine() @@ -459,6 +449,8 @@ object Cli { private def handleAddUpdate(expr: String, args: Array[String], add: Boolean, help: Boolean = false): Unit = { val parser = newParser() + parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + parser.accepts("cpus", "cpu amount (0.5, 1, 2)").withRequiredArg().ofType(classOf[java.lang.Double]) parser.accepts("mem", "mem amount in Mb").withRequiredArg().ofType(classOf[java.lang.Long]) parser.accepts("heap", "heap amount in Mb").withRequiredArg().ofType(classOf[java.lang.Long]) @@ -503,6 +495,7 @@ object Cli { throw new Error(e.getMessage) } + val cluster = options.valueOf("cluster").asInstanceOf[String] val cpus = options.valueOf("cpus").asInstanceOf[java.lang.Double] val mem = options.valueOf("mem").asInstanceOf[java.lang.Long] val heap = options.valueOf("heap").asInstanceOf[java.lang.Long] @@ -523,6 +516,7 @@ object Cli { val params = new util.LinkedHashMap[String, String] params.put("broker", expr) + if (cluster != null) params.put("cluster", cluster) if (cpus != null) params.put("cpus", "" + cpus) if (mem != null) params.put("mem", "" + mem) if (heap != null) params.put("heap", "" + heap) @@ -551,7 +545,7 @@ object Cli { printLine(s"$brokers $addedUpdated:") for (brokerNode <- brokerNodes) { val broker: Broker = new Broker() - broker.fromJson(brokerNode) + broker.fromJson(brokerNode, expanded = true) printBroker(broker, 1) printLine() @@ -631,7 +625,7 @@ object Cli { for (brokerNode <- brokerNodes) { val broker: Broker = new Broker() - broker.fromJson(brokerNode) + broker.fromJson(brokerNode, expanded = true) printBroker(broker, 1) printLine() @@ -684,7 +678,7 @@ object Cli { for (brokerNode <- brokerNodes) { val broker: Broker = new Broker() - broker.fromJson(brokerNode) + broker.fromJson(brokerNode, expanded = true) printBroker(broker, 1) printLine() @@ -754,6 +748,7 @@ object Cli { private def printBroker(broker: Broker, indent: Int): Unit = { printLine("id: " + broker.id, indent) + printLine("cluster: " + broker.cluster.id, indent) printLine("active: " + broker.active, indent) printLine("state: " + broker.state() + (if (broker.needsRestart) " (modified, needs restart)" else ""), indent) printLine("resources: " + brokerResources(broker), indent) @@ -841,7 +836,7 @@ object Cli { } cmd match { - case "list" => handleList(arg) + case "list" => handleList(arg, args) case "add" | "update" => handleAddUpdate(arg, args, cmd == "add") case "rebalance" => handleRebalance(arg, args) case _ => throw new Error("unsupported topic command " + cmd) @@ -857,7 +852,7 @@ object Cli { printLine() printLine("Run `help topic ` to see details of specific command") case "list" => - handleList(null, help = true) + handleList(null, null, help = true) case "add" | "update" => handleAddUpdate(null, null, cmd == "add", help = true) case "rebalance" => @@ -867,9 +862,12 @@ object Cli { } } - def handleList(expr: String, help: Boolean = false): Unit = { + def handleList(expr: String, args: Array[String], help: Boolean = false): Unit = { + val parser = newParser() + parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + if (help) { - printLine("List topics\nUsage: topic list []\n") + printLine("List topics\nUsage: topic list [] --cluster \n") handleGenericOptions(null, help = true) printLine() @@ -878,8 +876,20 @@ object Cli { return } + var options: OptionSet = null + try { options = parser.parse(args: _*) } + catch { + case e: OptionException => + parser.printHelpOn(out) + printLine() + throw new Error(e.getMessage) + } + + val cluster = options.valueOf("cluster").asInstanceOf[String] + val params = new util.LinkedHashMap[String, String] if (expr != null) params.put("topic", expr) + if (cluster != null) params.put("cluster", cluster) var json: Map[String, Object] = null try { json = sendRequest("/topic/list", params) } @@ -908,6 +918,7 @@ object Cli { parser.accepts("replicas", "replicas count. Default - 1").withRequiredArg().ofType(classOf[Integer]) } parser.accepts("options", "topic options. Example: flush.ms=60000,retention.ms=6000000").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) if (help) { printLine(s"${cmd.capitalize} topic\nUsage: topic $cmd [options]\n") @@ -936,6 +947,7 @@ object Cli { throw new Error(e.getMessage) } + val cluster = options.valueOf("cluster").asInstanceOf[String] val broker = options.valueOf("broker").asInstanceOf[String] val partitions = options.valueOf("partitions").asInstanceOf[Integer] val replicas = options.valueOf("replicas").asInstanceOf[Integer] @@ -943,6 +955,7 @@ object Cli { val params = new util.LinkedHashMap[String, String] params.put("topic", name) + if (cluster != null) params.put("cluster", cluster) if (broker != null) params.put("broker", broker) if (partitions != null) params.put("partitions", "" + partitions) if (replicas != null) params.put("replicas", "" + replicas) @@ -972,6 +985,7 @@ object Cli { parser.accepts("broker", ". Default - *. See below.").withRequiredArg().ofType(classOf[String]) parser.accepts("replicas", "replicas count. Default - 1").withRequiredArg().ofType(classOf[Integer]) parser.accepts("timeout", "timeout (30s, 1m, 1h). 0s - no timeout").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) if (help) { printLine("Rebalance topics\nUsage: topic rebalance |status [options]\n") @@ -1000,12 +1014,14 @@ object Cli { val broker: String = options.valueOf("broker").asInstanceOf[String] val replicas: Integer = options.valueOf("replicas").asInstanceOf[Integer] val timeout: String = options.valueOf("timeout").asInstanceOf[String] + val cluster: String = options.valueOf("cluster").asInstanceOf[String] val params = new util.LinkedHashMap[String, String]() if (exprOrStatus != "status") params.put("topic", exprOrStatus) if (broker != null) params.put("broker", broker) if (replicas != null) params.put("replicas", "" + replicas) if (timeout != null) params.put("timeout", timeout) + if (cluster != null) params.put("cluster", cluster) var json: Map[String, Object] = null try { json = sendRequest("/topic/rebalance", params) } diff --git a/src/scala/ly/stealth/mesos/kafka/Cluster.scala b/src/scala/ly/stealth/mesos/kafka/Cluster.scala index 87453e9..ae84a1d 100644 --- a/src/scala/ly/stealth/mesos/kafka/Cluster.scala +++ b/src/scala/ly/stealth/mesos/kafka/Cluster.scala @@ -1,159 +1,51 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package ly.stealth.mesos.kafka -import java.util -import scala.util.parsing.json.{JSONArray, JSONObject} -import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.collection.mutable.ListBuffer -import java.util.Collections -import java.io.{FileWriter, File} -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.exception.ZkNodeExistsException +import scala.util.parsing.json.JSONObject class Cluster { - private val brokers: util.List[Broker] = new util.concurrent.CopyOnWriteArrayList[Broker]() - private[kafka] var rebalancer: Rebalancer = new Rebalancer() - private[kafka] var topics: Topics = new Topics() - private[kafka] var frameworkId: String = null + var id: String = null + var zkConnect: String = null - def getBrokers:util.List[Broker] = Collections.unmodifiableList(brokers) + private[kafka] var topics: Topics = new Topics(() => zkConnect) + private[kafka] var rebalancer: Rebalancer = new Rebalancer(() => zkConnect) - def getBroker(id: String): Broker = { - for (broker <- brokers) - if (broker.id == id) return broker - null + def this(_id: String) { + this + id = _id } - def addBroker(broker: Broker): Broker = { - brokers.add(broker) - broker + def this(json: Map[String, Any]) { + this + fromJson(json) } - def removeBroker(broker: Broker): Unit = brokers.remove(broker) - - def clear(): Unit = brokers.clear() - def load() = Cluster.storage.load(this) - def save() = Cluster.storage.save(this) + def getBrokers: List[Broker] = Nodes.getBrokers.filter(_.cluster == this) - def fromJson(root: Map[String, Object]): Unit = { - if (root.contains("brokers")) { - for (brokerNode <- root("brokers").asInstanceOf[List[Map[String, Object]]]) { - val broker: Broker = new Broker() - broker.fromJson(brokerNode) - brokers.add(broker) - } - } + def active: Boolean = getBrokers.exists(_.active) + def idle: Boolean = !active - if (root.contains("frameworkId")) - frameworkId = root("frameworkId").asInstanceOf[String] + def fromJson(json: Map[String, Any]): Unit = { + id = json("id").asInstanceOf[String] + if (json.contains("zkConnect")) + zkConnect = json("zkConnect").asInstanceOf[String] } def toJson: JSONObject = { - val obj = new mutable.LinkedHashMap[String, Object]() - - if (!brokers.isEmpty) { - val brokerNodes = new ListBuffer[JSONObject]() - for (broker <- brokers) - brokerNodes.add(broker.toJson) - obj("brokers") = new JSONArray(brokerNodes.toList) - } - - if (frameworkId != null) obj("frameworkId") = frameworkId - new JSONObject(obj.toMap) - } -} - -object Cluster { - var storage: Storage = newStorage(Config.storage) + val json = new mutable.LinkedHashMap[String, Any]() + json("id") = id + if (zkConnect != null) + json("zkConnect") = zkConnect - def newStorage(s: String): Storage = { - if (s.startsWith("file:")) return new FsStorage(new File(s.substring("file:".length))) - else if (s.startsWith("zk:")) return new ZkStorage(s.substring("zk:".length)) - throw new IllegalStateException("Unsupported storage " + s) + new JSONObject(json.toMap) } - abstract class Storage { - def load(cluster: Cluster): Unit = { - val json: String = loadJson - if (json == null) return - - val node: Map[String, Object] = Util.parseJson(json) - cluster.brokers.clear() - cluster.fromJson(node) - } - - def save(cluster: Cluster): Unit = { - saveJson("" + cluster.toJson) - } - - protected def loadJson: String - protected def saveJson(json: String): Unit - } + override def hashCode(): Int = id.hashCode - class FsStorage(val file: File) extends Storage { - protected def loadJson: String = { - if (!file.exists) return null - val source = scala.io.Source.fromFile(file) - try source.mkString finally source.close() - } - - protected def saveJson(json: String): Unit = { - val writer = new FileWriter(file) - try { writer.write(json) } - finally { writer.close() } - } + override def equals(obj: scala.Any): Boolean = { + if (!obj.isInstanceOf[Cluster]) false + else id == obj.asInstanceOf[Cluster].id } - object FsStorage { - val DEFAULT_FILE: File = new File("kafka-mesos.json") - } - - class ZkStorage(val path: String) extends Storage { - createChrootIfRequired() - - def zkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) - - private def createChrootIfRequired(): Unit = { - val slashIdx: Int = Config.zk.indexOf('/') - if (slashIdx == -1) return - - val chroot = Config.zk.substring(slashIdx) - val zkConnect = Config.zk.substring(0, slashIdx) - - val client = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - try { client.createPersistent(chroot, true) } - finally { client.close() } - } - - protected def loadJson: String = { - val zkClient = this.zkClient - try { zkClient.readData(path, true).asInstanceOf[String] } - finally { zkClient.close() } - } - - protected def saveJson(json: String): Unit = { - val zkClient = this.zkClient - try { zkClient.createPersistent(path, json) } - catch { case e: ZkNodeExistsException => zkClient.writeData(path, json) } - finally { zkClient.close() } - } - } -} \ No newline at end of file + override def toString: String = id +} diff --git a/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala b/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala new file mode 100644 index 0000000..c2fba19 --- /dev/null +++ b/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala @@ -0,0 +1,156 @@ +package ly.stealth.mesos.kafka + +import java.io.IOException +import java.util +import java.util.Collections +import Cli._ +import joptsimple.{OptionException, OptionSet} + +object ClusterCli { + def handle(cmd: String, _args: Array[String], help: Boolean = false): Unit = { + var args = _args + + if (help) { + handleHelp(cmd) + return + } + + var arg: String = null + if (args.length > 0 && !args(0).startsWith("-")) { + arg = args(0) + args = args.slice(1, args.length) + } + + if (arg == null && cmd != "list") { + handleHelp(cmd); printLine() + throw new Error("argument required") + } + + cmd match { + case "list" => handleList(arg) + case "add" | "update" => handleAddUpdate(arg, args, cmd == "add") + case "remove" => handleRemove(arg) + case _ => throw new Error("unsupported broker command " + cmd) + } + } + + private def handleHelp(cmd: String): Unit = { + cmd match { + case null => + printLine("Cluster management commands\nUsage: cluster \n") + printCmds() + + printLine() + printLine("Run `help cluster ` to see details of specific command") + case "list" => + handleList(null, help = true) + case "add" | "update" => + handleAddUpdate(null, null, cmd == "add", help = true) + case "remove" => + handleRemove(null, help = true) + case _ => + throw new Error(s"unsupported cluster command $cmd") + } + } + + private def handleList(expr: String, help: Boolean = false): Unit = { + if (help) { + printLine("List brokers\nUsage: cluster list\n") + handleGenericOptions(null, help = true) + + printLine() + + return + } + + var json: Map[String, Object] = null + try { json = sendRequest("/cluster/list", new util.HashMap[String, String]) } + catch { case e: IOException => throw new Error("" + e) } + + val clusters = json("clusters").asInstanceOf[List[Map[String, Object]]] + val title = if (clusters.isEmpty) "no clusters" else "cluster" + (if (clusters.size > 1) "s" else "") + ":" + printLine(title) + + for (clusterJson <- clusters) { + val cluster = new Cluster() + cluster.fromJson(clusterJson) + + printCluster(cluster, 1) + printLine() + } + } + + private def printCluster(cluster: Cluster, indent: Int): Unit = { + printLine("id: " + cluster.id, indent) + printLine("zk connection string: " + cluster.zkConnect, indent) + } + + private def handleAddUpdate(id: String, args: Array[String], add: Boolean, help: Boolean = false): Unit = { + val parser = newParser() + parser.accepts("zk-connect", "REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2:2181/kafka1") + .withRequiredArg().ofType(classOf[String]) + + if (help) { + val cmd = if (add) "add" else "update" + printLine(s"${cmd.capitalize} cluster\nUsage: cluster $cmd [options]\n") + parser.printHelpOn(out) + + printLine() + handleGenericOptions(null, help = true) + + if (!add) printLine("\nNote: use \"\" arg to unset an option") + return + } + + var options: OptionSet = null + try { + options = parser.parse(args: _*) + } catch { + case e: OptionException => + parser.printHelpOn(out) + printLine() + throw new Error(e.getMessage) + } + + val zkConnect = options.valueOf("zk-connect").asInstanceOf[String] + + val params = new util.LinkedHashMap[String, String] + params.put("cluster", id) + + if (zkConnect != null) params.put("zkConnect", zkConnect) + + var json: Map[String, Object] = null + try { + json = sendRequest("/cluster/" + (if (add) "add" else "update"), params) + } catch { + case e: IOException => throw new Error("" + e) + } + + val clusters = json("clusters").asInstanceOf[List[Map[String, Object]]] + val op = if (add) "added" else "updated" + printLine(s"cluster $op:") + for (clusterJson <- clusters) { + val cluster = new Cluster(clusterJson) + + printCluster(cluster, 1) + printLine() + } + } + + private def handleRemove(id: String, help: Boolean = false): Unit = { + if (help) { + printLine("Remove cluster\nUsage: cluster remove \n") + handleGenericOptions(null, help = true) + + return + } + + var json: Map[String, Object] = null + try { json = sendRequest("/cluster/remove", Collections.singletonMap("cluster", id)) } + catch { case e: IOException => throw new Error("" + e) } + + val resId = json("id").asInstanceOf[String] + + printLine(s"cluster $resId removed") + } +} diff --git a/src/scala/ly/stealth/mesos/kafka/Config.scala b/src/scala/ly/stealth/mesos/kafka/Config.scala index 66d8b4e..2ec78fb 100644 --- a/src/scala/ly/stealth/mesos/kafka/Config.scala +++ b/src/scala/ly/stealth/mesos/kafka/Config.scala @@ -41,7 +41,6 @@ object Config { var log: File = null var api: String = null var bindAddress: BindAddress = null - var zk: String = null def apiPort: Int = { val port = new URI(api).getPort @@ -80,7 +79,6 @@ object Config { if (props.containsKey("log")) log = new File(props.getProperty("log")) if (props.containsKey("api")) api = props.getProperty("api") if (props.containsKey("bind-address")) bindAddress = new BindAddress(props.getProperty("bind-address")) - if (props.containsKey("zk")) zk = props.getProperty("zk") } override def toString: String = { @@ -88,7 +86,7 @@ object Config { |debug: $debug, storage: $storage |mesos: master=$master, user=${if (user == null || user.isEmpty) "" else user}, principal=${if (principal != null) principal else ""}, secret=${if (secret != null) "*****" else ""} |framework: name=$frameworkName, role=$frameworkRole, timeout=$frameworkTimeout - |api: $api, bind-address: ${if (bindAddress != null) bindAddress else ""}, zk: $zk, jre: ${if (jre == null) "" else jre} + |api: $api, bind-address: ${if (bindAddress != null) bindAddress else ""}, jre: ${if (jre == null) "" else jre} """.stripMargin.trim } } diff --git a/src/scala/ly/stealth/mesos/kafka/Executor.scala b/src/scala/ly/stealth/mesos/kafka/Executor.scala index f668072..4b4ef46 100644 --- a/src/scala/ly/stealth/mesos/kafka/Executor.scala +++ b/src/scala/ly/stealth/mesos/kafka/Executor.scala @@ -71,7 +71,7 @@ object Executor extends org.apache.mesos.Executor { try { val data: util.Map[String, String] = Util.parseMap(task.getData.toStringUtf8) val broker = new Broker() - broker.fromJson(Util.parseJson(data.get("broker"))) + broker.fromJson(Util.parseJson(data.get("broker")), expanded = true) val defaults = Util.parseMap(data.get("defaults")) val endpoint = server.start(broker, defaults) diff --git a/src/scala/ly/stealth/mesos/kafka/Expr.scala b/src/scala/ly/stealth/mesos/kafka/Expr.scala index 3bda5f5..ddc75a7 100644 --- a/src/scala/ly/stealth/mesos/kafka/Expr.scala +++ b/src/scala/ly/stealth/mesos/kafka/Expr.scala @@ -9,7 +9,7 @@ import ly.stealth.mesos.kafka.Broker.Task import java.lang.Comparable object Expr { - def expandBrokers(cluster: Cluster, _expr: String, sortByAttrs: Boolean = false): util.List[String] = { + def expandBrokers(_expr: String, sortByAttrs: Boolean = false): util.List[String] = { var expr: String = _expr var attributes: util.Map[String, String] = null @@ -27,7 +27,7 @@ object Expr { val part = _part.trim() if (part.equals("*")) - for (broker <- cluster.getBrokers) ids.add(broker.id) + for (broker <- Nodes.getBrokers) ids.add(broker.id) else if (part.contains("..")) { val idx = part.indexOf("..") @@ -53,12 +53,12 @@ object Expr { ids = new util.ArrayList[String](ids.distinct.sorted.toList) if (attributes != null) - filterAndSortBrokersByAttrs(cluster, ids, attributes, sortByAttrs) + filterAndSortBrokersByAttrs(ids, attributes, sortByAttrs) ids } - private def filterAndSortBrokersByAttrs(cluster: Cluster, ids: util.Collection[String], attributes: util.Map[String, String], sortByAttrs: Boolean): Unit = { + private def filterAndSortBrokersByAttrs(ids: util.Collection[String], attributes: util.Map[String, String], sortByAttrs: Boolean): Unit = { def brokerAttr(broker: Broker, name: String): String = { if (broker == null || broker.task == null) return null @@ -90,7 +90,7 @@ object Expr { val iterator = ids.iterator() while (iterator.hasNext) { val id = iterator.next() - val broker = cluster.getBroker(id) + val broker = Nodes.getBroker(id) if (!brokerMatches(broker)) iterator.remove() @@ -119,7 +119,7 @@ object Expr { val values = new util.HashMap[Value, util.List[String]]() for (id <- ids) { - val broker: Broker = cluster.getBroker(id) + val broker: Broker = Nodes.getBroker(id) if (broker != null) { val value = new Value(broker) @@ -161,10 +161,10 @@ object Expr { out.println(" 0..4[rack=r1,dc=dc1] - any broker having rack=r1 and dc=dc1") } - def expandTopics(expr: String): util.List[String] = { + def expandTopics(expr: String, cluster: Cluster): util.List[String] = { val topics = new util.TreeSet[String]() - val zkClient = newZkClient + val zkClient = newZkClient(cluster.zkConnect) var allTopics: util.List[String] = null try { allTopics = ZkUtils.getAllTopics(zkClient) } finally { zkClient.close() } @@ -188,5 +188,5 @@ object Expr { out.println(" t* - topics starting with 't'") } - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + private def newZkClient(zkConnect: String): ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) } diff --git a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala index ba1028d..506c500 100644 --- a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala @@ -124,6 +124,7 @@ object HttpServer { else if (uri.startsWith("/health")) handleHealth(response) else if (uri.startsWith("/api/broker")) handleBrokerApi(request, response) else if (uri.startsWith("/api/topic")) handleTopicApi(request, response) + else if (uri.startsWith("/api/cluster")) handleClusterApi(request, response) else response.sendError(404, "uri not found") } @@ -139,6 +140,76 @@ object HttpServer { response.getWriter.println("ok") } + def handleClusterApi(request: HttpServletRequest, response: HttpServletResponse) { + response.setContentType("application/json; charset=utf-8") + request.setAttribute("jsonResponse", true) + var uri: String = request.getRequestURI.substring("/api/cluster".length) + if (uri.startsWith("/")) uri = uri.substring(1) + + if (uri == "list") handleListClusters(request, response) + else if (uri == "add" || uri == "update") handleAddUpdateCluster(uri == "add", request, response) + else if (uri == "remove") handleRemoveCluster(request, response) + else response.sendError(404, "unsupported method") + } + + private def handleListClusters(request: HttpServletRequest, response: HttpServletResponse) { + val clustersJson = Nodes.getClusters.map(_.toJson) + response.getWriter.println("" + new JSONObject(Map("clusters" -> new JSONArray(clustersJson)))) + } + + private def handleAddUpdateCluster(add: Boolean, request: HttpServletRequest, response: HttpServletResponse) { + val id: String = request.getParameter("cluster") + val zkConnect: String = request.getParameter("zkConnect") + + if (id == null || id.isEmpty) { + response.sendError(404, "cluster required") + return + } + + val errors = new util.ArrayList[String]() + var cluster = Nodes.getCluster(id) + if (add && cluster != null) + errors.add("duplicate cluster") + if (!add && cluster == null) + errors.add("cluster not found") + if (!add && cluster.active) + errors.add("cluster has active nodes") + + if (add && zkConnect == null || zkConnect.trim.isEmpty) + errors.add(404, "cluster has active nodes") + + if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return } + + if (add) + cluster = Nodes.addCluster(new Cluster(id)) + if (zkConnect != null) cluster.zkConnect = zkConnect + + Nodes.save() + response.getWriter.println("" + new JSONObject(Map("clusters" -> new JSONArray(List(cluster.toJson))))) + } + + private def handleRemoveCluster(request: HttpServletRequest, response: HttpServletResponse) { + val id: String = request.getParameter("cluster") + if (id == null || id.isEmpty) { + response.sendError(404, "cluster required") + return + } + val cluster = Nodes.getCluster(id) + if (cluster == null) { + response.sendError(404, "cluster not found") + return + } + if (cluster.getBrokers.nonEmpty) { + response.sendError(404, "can't remove cluster with nodes, remove nodes first") + return + } + + Nodes.removeCluster(cluster) + Nodes.save() + + response.getWriter.println(JSONObject(Map("id" -> id))) + } + def handleBrokerApi(request: HttpServletRequest, response: HttpServletResponse): Unit = { request.setAttribute("jsonResponse", true) response.setContentType("application/json; charset=utf-8") @@ -155,29 +226,38 @@ object HttpServer { } def handleListBrokers(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster = Scheduler.cluster - var expr = request.getParameter("broker") if (expr == null) expr = "*" var ids: util.List[String] = null - try { ids = Expr.expandBrokers(cluster, expr) } + try { ids = Expr.expandBrokers(expr) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } val brokerNodes = new ListBuffer[JSONObject]() for (id <- ids) { - val broker = cluster.getBroker(id) - if (broker != null) brokerNodes.add(cluster.getBroker(id).toJson) + val broker = Nodes.getBroker(id) + if (broker != null) brokerNodes.add(Nodes.getBroker(id).toJson(expanded = true)) } response.getWriter.println("" + new JSONObject(Map("brokers" -> new JSONArray(brokerNodes.toList)))) } def handleAddUpdateBroker(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster = Scheduler.cluster + val clusterId = request.getParameter("cluster") + var cluster: Cluster = null + if (clusterId != null) { + cluster = Nodes.getCluster(clusterId) + if (cluster == null) { + response.sendError(404, s"cluster $clusterId not found") + return + } + } + val add: Boolean = request.getRequestURI.endsWith("add") val errors = new util.ArrayList[String]() + if (add && cluster == null) errors.add("cluster required") + val expr: String = request.getParameter("broker") if (expr == null || expr.isEmpty) errors.add("broker required") @@ -250,13 +330,13 @@ object HttpServer { if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return } var ids: util.List[String] = null - try { ids = Expr.expandBrokers(cluster, expr) } + try { ids = Expr.expandBrokers(expr) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } val brokers = new util.ArrayList[Broker]() for (id <- ids) { - var broker = cluster.getBroker(id) + var broker = Nodes.getBroker(id) if (add) if (broker != null) errors.add(s"Broker $id already exists") @@ -270,6 +350,9 @@ object HttpServer { if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return } for (broker <- brokers) { + if (cluster != null) + broker.cluster = cluster + if (cpus != null) broker.cpus = cpus if (mem != null) broker.mem = mem if (heap != null) broker.heap = heap @@ -287,37 +370,35 @@ object HttpServer { if (failoverMaxDelay != null) broker.failover.maxDelay = failoverMaxDelay if (failoverMaxTries != null) broker.failover.maxTries = if (failoverMaxTries != "") Integer.valueOf(failoverMaxTries) else null - if (add) cluster.addBroker(broker) + if (add) Nodes.addBroker(broker) else if (broker.active || broker.task != null) broker.needsRestart = true } - cluster.save() + Nodes.save() val brokerNodes = new ListBuffer[JSONObject]() - for (broker <- brokers) brokerNodes.add(broker.toJson) + for (broker <- brokers) brokerNodes.add(broker.toJson(expanded = true)) response.getWriter.println("" + new JSONObject(Map("brokers" -> new JSONArray(brokerNodes.toList)))) } def handleRemoveBroker(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster = Scheduler.cluster - val expr = request.getParameter("broker") if (expr == null) { response.sendError(400, "broker required"); return } var ids: util.List[String] = null - try { ids = Expr.expandBrokers(cluster, expr) } + try { ids = Expr.expandBrokers(expr) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } val brokers = new util.ArrayList[Broker]() for (id <- ids) { - val broker = Scheduler.cluster.getBroker(id) + val broker = Nodes.getBroker(id) if (broker == null) { response.sendError(400, s"broker $id not found"); return } if (broker.active) { response.sendError(400, s"broker $id is active"); return } brokers.add(broker) } - brokers.foreach(cluster.removeBroker) - cluster.save() + brokers.foreach(Nodes.removeBroker) + Nodes.save() val result = new collection.mutable.LinkedHashMap[String, Any]() result("ids") = ids.mkString(",") @@ -326,7 +407,6 @@ object HttpServer { } def handleStartStopBroker(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster: Cluster = Scheduler.cluster val start: Boolean = request.getRequestURI.endsWith("start") var timeout: Period = new Period("60s") @@ -340,12 +420,12 @@ object HttpServer { if (expr == null) { response.sendError(400, "broker required"); return } var ids: util.List[String] = null - try { ids = Expr.expandBrokers(cluster, expr) } + try { ids = Expr.expandBrokers(expr) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } val brokers = new util.ArrayList[Broker]() for (id <- ids) { - val broker = cluster.getBroker(id) + val broker = Nodes.getBroker(id) if (broker == null) { response.sendError(400, "broker " + id + " not found"); return } if (!force && broker.active == start) { response.sendError(400, "broker " + id + " is" + (if (start) "" else " not") + " active"); return } brokers.add(broker) @@ -356,7 +436,7 @@ object HttpServer { broker.failover.resetFailures() if (!start && force) Scheduler.forciblyStopBroker(broker) } - cluster.save() + Nodes.save() def waitForBrokers(): String = { if (timeout.ms == 0) return "scheduled" @@ -371,13 +451,11 @@ object HttpServer { val status = waitForBrokers() val brokerNodes = new ListBuffer[JSONObject]() - for (broker <- brokers) brokerNodes.add(broker.toJson) + for (broker <- brokers) brokerNodes.add(broker.toJson(expanded = true)) response.getWriter.println(JSONObject(Map("status" -> status, "brokers" -> new JSONArray(brokerNodes.toList)))) } def handleRestartBroker(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster: Cluster = Scheduler.cluster - var timeout: Period = new Period("2m") if (request.getParameter("timeout") != null) try { timeout = new Period(request.getParameter("timeout")) } @@ -387,12 +465,12 @@ object HttpServer { if (expr == null) { response.sendError(400, "broker required"); return } var ids: util.List[String] = null - try { ids = Expr.expandBrokers(cluster, expr) } + try { ids = Expr.expandBrokers(expr) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } val brokers = new util.ArrayList[Broker]() for (id <- ids) { - val broker = cluster.getBroker(id) + val broker = Nodes.getBroker(id) if (broker == null) { response.sendError(400, s"broker $id not found"); return } if (!broker.active || broker.task == null || !broker.task.running) { response.sendError(400, s"broker $id is not running"); return } brokers.add(broker) @@ -408,7 +486,7 @@ object HttpServer { broker.active = false broker.failover.resetFailures() val begin = System.currentTimeMillis() - cluster.save() + Nodes.save() if (!broker.waitFor(null, timeout)) { response.getWriter.println("" + timeoutJson(broker, "stop")); return } @@ -416,17 +494,15 @@ object HttpServer { // start broker.active = true - cluster.save() + Nodes.save() if (!broker.waitFor(State.RUNNING, startTimeout)) { response.getWriter.println("" + timeoutJson(broker, "start")); return } } - response.getWriter.println(JSONObject(Map("status" -> "restarted", "brokers" -> new JSONArray(brokers.map(_.toJson).toList)))) + response.getWriter.println(JSONObject(Map("status" -> "restarted", "brokers" -> new JSONArray(brokers.map(_.toJson(expanded = true)).toList)))) } def handleBrokerLog(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster: Cluster = Scheduler.cluster - var timeout: Period = new Period("30s") if (request.getParameter("timeout") != null) try { timeout = new Period(request.getParameter("timeout")) } @@ -444,7 +520,7 @@ object HttpServer { catch { case e: NumberFormatException => response.sendError(400, "invalid lines"); return } if (lines <= 0) { response.sendError(400, "lines has to be greater than 0"); return } - val broker = cluster.getBroker(id) + val broker = Nodes.getBroker(id) if (broker == null) { response.sendError(400, "broker " + id + " not found"); return } if (!broker.active) { response.sendError(400, "broker " + id + " is not active"); return } if (broker.task == null || !broker.task.running) { response.sendError(400, "broker " + id + " is not running"); return } @@ -487,11 +563,16 @@ object HttpServer { } def handleListTopics(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val topics: Topics = Scheduler.cluster.topics + val cluster = Option(request.getParameter("cluster")).map(Nodes.getCluster).orNull + if (cluster == null){ + response.sendError(404, "cluster param is empty, or cluster doesn't exist") + return + } + val topics: Topics = cluster.topics var expr = request.getParameter("topic") if (expr == null) expr = "*" - val names: util.List[String] = Expr.expandTopics(expr) + val names: util.List[String] = Expr.expandTopics(expr, cluster) val topicNodes = new ListBuffer[JSONObject]() for (topic <- topics.getTopics) @@ -502,17 +583,23 @@ object HttpServer { } def handleAddUpdateTopic(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val topics: Topics = Scheduler.cluster.topics + val cluster = Option(request.getParameter("cluster")).map(Nodes.getCluster).orNull + if (cluster == null){ + response.sendError(404, "cluster param is empty, or cluster doesn't exist") + return + } + val topics: Topics = cluster.topics + val add: Boolean = request.getRequestURI.endsWith("add") val errors = new util.ArrayList[String]() val topicExpr: String = request.getParameter("topic") if (topicExpr == null || topicExpr.isEmpty) errors.add("topic required") - val topicNames: util.List[String] = Expr.expandTopics(topicExpr) + val topicNames: util.List[String] = Expr.expandTopics(topicExpr, cluster) var brokerIds: util.List[Int] = null if (request.getParameter("broker") != null) - try { brokerIds = Expr.expandBrokers(Scheduler.cluster, request.getParameter("broker"), sortByAttrs = true).map(Integer.parseInt) } + try { brokerIds = Expr.expandBrokers(request.getParameter("broker"), sortByAttrs = true).map(Integer.parseInt) } catch { case e: IllegalArgumentException => errors.add("Invalid broker-expr") } var partitions: Int = 1 @@ -556,13 +643,17 @@ object HttpServer { } def handleTopicRebalance(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val cluster: Cluster = Scheduler.cluster + val cluster = Option(request.getParameter("cluster")).map(Nodes.getCluster).orNull + if (cluster == null){ + response.sendError(404, "cluster param is empty, or cluster doesn't exist") + return + } val rebalancer: Rebalancer = cluster.rebalancer val topicExpr = request.getParameter("topic") var topics: util.List[String] = null if (topicExpr != null) - try { topics = Expr.expandTopics(topicExpr)} + try { topics = Expr.expandTopics(topicExpr, cluster)} catch { case e: IllegalArgumentException => response.sendError(400, "invalid topics"); return } if (topics != null && rebalancer.running) { response.sendError(400, "rebalance is already running"); return } @@ -571,7 +662,7 @@ object HttpServer { val brokerExpr: String = if (request.getParameter("broker") != null) request.getParameter("broker") else "*" var brokers: util.List[String] = null if (brokerExpr != null) - try { brokers = Expr.expandBrokers(cluster, brokerExpr, sortByAttrs = true) } + try { brokers = Expr.expandBrokers(brokerExpr, sortByAttrs = true) } catch { case e: IllegalArgumentException => response.sendError(400, "invalid broker-expr"); return } var timeout: Period = new Period("0") diff --git a/src/scala/ly/stealth/mesos/kafka/Nodes.scala b/src/scala/ly/stealth/mesos/kafka/Nodes.scala new file mode 100644 index 0000000..49db95d --- /dev/null +++ b/src/scala/ly/stealth/mesos/kafka/Nodes.scala @@ -0,0 +1,105 @@ +package ly.stealth.mesos.kafka + +import java.io.File + +import org.apache.log4j.Logger + +import scala.collection.mutable +import scala.util.parsing.json.{JSONArray, JSONObject} + +object Nodes { + private[kafka] val logger = Logger.getLogger(this.getClass) + private[kafka] var storage = newStorage(Config.storage) + + var frameworkId: String = null + val clusters = new mutable.ListBuffer[Cluster] + val brokers = new mutable.ListBuffer[Broker] + + reset() + + def getClusters: List[Cluster] = clusters.toList + + def getCluster(id: String): Cluster = clusters.find(id == _.id).orNull + + def addCluster(cluster: Cluster): Cluster = { + if (getCluster(cluster.id) != null) + throw new IllegalArgumentException(s"duplicate cluster ${cluster.id}") + + clusters += cluster + cluster + } + + def removeCluster(cluster: Cluster): Unit = { + clusters -= cluster + } + + + def getBrokers: List[Broker] = brokers.toList.sortBy(_.id.toInt) + + def getBroker(id: String) = brokers.find(id == _.id).orNull + + def addBroker(broker: Broker): Broker = { + if (getBroker(broker.id) != null) throw new IllegalArgumentException(s"duplicate node ${broker.id}") + brokers += broker + broker + } + + def removeBroker(broker: Broker): Unit = { brokers -= broker } + + def reset(): Unit = { + frameworkId = null + + clear() + } + + def clear(): Unit = { + brokers.clear() + clusters.clear() + } + + def load() = storage.load() + def save() = storage.save() + + def fromJson(json: Map[String, Any]): Unit = { + if (json.contains("clusters")) { + clusters.clear() + for (clusterObj <- json("clusters").asInstanceOf[List[Map[String, Object]]]) + addCluster(new Cluster(clusterObj)) + } + + if (json.contains("brokers")) { + brokers.clear() + for (brokerNode <- json("brokers").asInstanceOf[List[Map[String, Object]]]) { + val broker: Broker = new Broker() + broker.fromJson(brokerNode) + brokers.append(broker) + } + } + + if (json.contains("frameworkId")) + frameworkId = json("frameworkId").asInstanceOf[String] + } + + def toJson: JSONObject = { + val json = new mutable.LinkedHashMap[String, Object]() + if (frameworkId != null) json("frameworkId") = frameworkId + + if (clusters.nonEmpty) { + val clustersJson = clusters.map(_.toJson) + json("clusters") = new JSONArray(clustersJson.toList) + } + + if (brokers.nonEmpty) { + val brokersJson = brokers.map(_.toJson()) + json("brokers") = new JSONArray(brokersJson.toList) + } + + new JSONObject(json.toMap) + } + + def newStorage(s: String): Storage = { + if (s.startsWith("file:")) new FsStorage(new File(s.substring("file:".length))) + else if (s.startsWith("zk:")) new ZkStorage(s.substring("zk:".length)) + else throw new IllegalStateException("Unsupported storage " + s) + } +} diff --git a/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala b/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala index ac84e7a..1cb5c90 100644 --- a/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala +++ b/src/scala/ly/stealth/mesos/kafka/Rebalancer.scala @@ -32,13 +32,13 @@ import kafka.utils.{ZkUtils, ZKStringSerializer} import ly.stealth.mesos.kafka.Util.Period import org.apache.log4j.Logger -class Rebalancer { +class Rebalancer(zkConnect: () => String) { private val logger: Logger = Logger.getLogger(this.getClass) @volatile private var assignment: Map[TopicAndPartition, Seq[Int]] = null @volatile private var reassignment: Map[TopicAndPartition, Seq[Int]] = null - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) + private def newZkClient: ZkClient = new ZkClient(zkConnect(), 30000, 30000, ZKStringSerializer) def running: Boolean = { val zkClient = newZkClient diff --git a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala index d657a52..1ab8340 100644 --- a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala +++ b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala @@ -30,7 +30,6 @@ import ly.stealth.mesos.kafka.Util.{Version, Period, Str} object Scheduler extends org.apache.mesos.Scheduler { private val logger: Logger = Logger.getLogger(this.getClass) - val cluster: Cluster = new Cluster() private var driver: SchedulerDriver = null val logs = new ConcurrentHashMap[Long, Option[String]]() @@ -69,7 +68,7 @@ object Scheduler extends org.apache.mesos.Scheduler { "log.dirs" -> "kafka-logs", "log.retention.bytes" -> ("" + 10l * 1024 * 1024 * 1024), - "zookeeper.connect" -> Config.zk, + "zookeeper.connect" -> broker.cluster.zkConnect, "host.name" -> offer.getHostname ) @@ -80,7 +79,7 @@ object Scheduler extends org.apache.mesos.Scheduler { defaults += ("log.dirs" -> "data/kafka-logs") val data = new util.HashMap[String, String]() - data.put("broker", "" + broker.toJson) + data.put("broker", "" + broker.toJson(expanded = true)) data.put("defaults", Util.formatMap(defaults)) ByteString.copyFromUtf8(Util.formatMap(data)) } @@ -99,8 +98,8 @@ object Scheduler extends org.apache.mesos.Scheduler { def registered(driver: SchedulerDriver, id: FrameworkID, master: MasterInfo): Unit = { logger.info("[registered] framework:" + Str.id(id.getValue) + " master:" + Str.master(master)) - cluster.frameworkId = id.getValue - cluster.save() + Nodes.frameworkId = id.getValue + Nodes.save() this.driver = driver reconcileTasksIfRequired(force = true) @@ -129,7 +128,7 @@ object Scheduler extends org.apache.mesos.Scheduler { def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = { logger.info("[frameworkMessage] executor:" + Str.id(executorId.getValue) + " slave:" + Str.id(slaveId.getValue) + " data: " + new String(data)) - val broker = cluster.getBroker(Broker.idFromExecutorId(executorId.getValue)) + val broker = Nodes.getBroker(Broker.idFromExecutorId(executorId.getValue)) try { val node: Map[String, Object] = Util.parseJson(new String(data)) @@ -187,7 +186,7 @@ object Scheduler extends org.apache.mesos.Scheduler { if (!declineReasons.isEmpty) logger.info("Declined offers:\n" + declineReasons.mkString("\n")) - for (broker <- cluster.getBrokers) { + for (broker <- Nodes.getBrokers) { if (broker.shouldStop) { logger.info(s"Stopping broker ${broker.id}: killing task ${broker.task.id}") driver.killTask(TaskID.newBuilder.setValue(broker.task.id).build) @@ -196,7 +195,7 @@ object Scheduler extends org.apache.mesos.Scheduler { } reconcileTasksIfRequired() - cluster.save() + Nodes.save() } private[kafka] def acceptOffer(offer: Offer): String = { @@ -204,7 +203,7 @@ object Scheduler extends org.apache.mesos.Scheduler { val now = new Date() var reason = "" - for (broker <- cluster.getBrokers.filter(_.shouldStart(offer.getHostname))) { + for (broker <- Nodes.getBrokers.filter(_.shouldStart(offer.getHostname))) { val diff = broker.matches(offer, now, otherTasksAttributes) if (diff == null) { @@ -220,7 +219,7 @@ object Scheduler extends org.apache.mesos.Scheduler { } private[kafka] def onBrokerStatus(status: TaskStatus): Unit = { - val broker = cluster.getBroker(Broker.idFromTaskId(status.getTaskId.getValue)) + val broker = Nodes.getBroker(Broker.idFromTaskId(status.getTaskId.getValue)) status.getState match { case TaskState.TASK_RUNNING => @@ -232,7 +231,7 @@ object Scheduler extends org.apache.mesos.Scheduler { case _ => logger.warn("Got unexpected task state: " + status.getState) } - cluster.save() + Nodes.save() } private[kafka] def onBrokerStarted(broker: Broker, status: TaskStatus): Unit = { @@ -280,7 +279,7 @@ object Scheduler extends org.apache.mesos.Scheduler { broker.needsRestart = false } - private def isReconciling: Boolean = cluster.getBrokers.exists(b => b.task != null && b.task.reconciling) + private def isReconciling: Boolean = Nodes.getBrokers.exists(b => b.task != null && b.task.reconciling) private[kafka] def launchTask(broker: Broker, offer: Offer): Unit = { broker.needsRestart = false @@ -326,7 +325,7 @@ object Scheduler extends org.apache.mesos.Scheduler { reconcileTime = now if (reconciles > RECONCILE_MAX_TRIES) { - for (broker <- cluster.getBrokers.filter(b => b.task != null && b.task.reconciling)) { + for (broker <- Nodes.getBrokers.filter(b => b.task != null && b.task.reconciling)) { logger.info(s"Reconciling exceeded $RECONCILE_MAX_TRIES tries for broker ${broker.id}, sending killTask for task ${broker.task.id}") driver.killTask(TaskID.newBuilder().setValue(broker.task.id).build()) broker.task = null @@ -337,7 +336,7 @@ object Scheduler extends org.apache.mesos.Scheduler { val statuses = new util.ArrayList[TaskStatus] - for (broker <- cluster.getBrokers.filter(_.task != null)) + for (broker <- Nodes.getBrokers.filter(_.task != null)) if (force || broker.task.reconciling) { broker.task.state = Broker.State.RECONCILING logger.info(s"Reconciling $reconciles/$RECONCILE_MAX_TRIES state of broker ${broker.id}, task ${broker.task.id}") @@ -360,7 +359,7 @@ object Scheduler extends org.apache.mesos.Scheduler { } val values = new util.ArrayList[String]() - for (broker <- cluster.getBrokers) + for (broker <- Nodes.getBrokers) if (broker.task != null) { val v = value(broker.task, name) if (v != null) values.add(v) @@ -373,12 +372,12 @@ object Scheduler extends org.apache.mesos.Scheduler { initLogging() logger.info(s"Starting ${getClass.getSimpleName}:\n$Config") - cluster.load() + Nodes.load() HttpServer.start() val frameworkBuilder = FrameworkInfo.newBuilder() frameworkBuilder.setUser(if (Config.user != null) Config.user else "") - if (cluster.frameworkId != null) frameworkBuilder.setId(FrameworkID.newBuilder().setValue(cluster.frameworkId)) + if (Nodes.frameworkId != null) frameworkBuilder.setId(FrameworkID.newBuilder().setValue(Nodes.frameworkId)) frameworkBuilder.setRole(Config.frameworkRole) frameworkBuilder.setName(Config.frameworkName) diff --git a/src/scala/ly/stealth/mesos/kafka/Storage.scala b/src/scala/ly/stealth/mesos/kafka/Storage.scala new file mode 100644 index 0000000..1ea52e1 --- /dev/null +++ b/src/scala/ly/stealth/mesos/kafka/Storage.scala @@ -0,0 +1,69 @@ +package ly.stealth.mesos.kafka + +import java.io.{FileWriter, File} + +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNodeExistsException + +trait Storage { + def load(): Unit = { + val json: String = loadJson + if (json == null) return + + val node: Map[String, Object] = Util.parseJson(json) + Nodes.fromJson(node) + } + + def save(): Unit = { + saveJson("" + Nodes.toJson) + } + + protected def loadJson: String + protected def saveJson(json: String): Unit +} + +class FsStorage(val file: File) extends Storage { + protected def loadJson: String = { + if (!file.exists) return null + val source = scala.io.Source.fromFile(file) + try source.mkString finally source.close() + } + + protected def saveJson(json: String): Unit = { + val writer = new FileWriter(file) + try { writer.write(json) } + finally { writer.close() } + } +} + +object FsStorage { + val DEFAULT_FILE: File = new File("kafka-mesos.json") +} + +class ZkStorage(val zk: String) extends Storage { + val (zkConnect, path) = zk.span(_ != '/') + createChrootIfRequired() + + def zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + private def createChrootIfRequired(): Unit = { + if (path != "") { + try { zkClient.createPersistent(path, true) } + finally { zkClient.close() } + } + } + + protected def loadJson: String = { + val zkClient = this.zkClient + try { zkClient.readData(path, true).asInstanceOf[String] } + finally { zkClient.close() } + } + + protected def saveJson(json: String): Unit = { + val zkClient = this.zkClient + try { zkClient.createPersistent(path, json) } + catch { case e: ZkNodeExistsException => zkClient.writeData(path, json) } + finally { zkClient.close() } + } +} diff --git a/src/scala/ly/stealth/mesos/kafka/Topics.scala b/src/scala/ly/stealth/mesos/kafka/Topics.scala index d2ad99e..4e400d4 100644 --- a/src/scala/ly/stealth/mesos/kafka/Topics.scala +++ b/src/scala/ly/stealth/mesos/kafka/Topics.scala @@ -31,8 +31,8 @@ import scala.util.parsing.json.JSONObject import ly.stealth.mesos.kafka.Topics.Topic import kafka.log.LogConfig -class Topics { - private def newZkClient: ZkClient = new ZkClient(Config.zk, 30000, 30000, ZKStringSerializer) +class Topics(zkConnect: () => String) { + private def newZkClient: ZkClient = new ZkClient(zkConnect(), 30000, 30000, ZKStringSerializer) def getTopic(name: String): Topics.Topic = { if (name == null) return null diff --git a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala index 1ce94b9..1116e89 100644 --- a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala @@ -32,6 +32,7 @@ class BrokerTest extends MesosTestCase { override def before { super.before broker = new Broker("0") + broker.cluster = testCluster broker.cpus = 0 broker.mem = 0 } @@ -328,6 +329,7 @@ class BrokerTest extends MesosTestCase { @Test def toJson_fromJson { + broker.cluster = testCluster broker.active = true broker.cpus = 0.5 broker.mem = 128 @@ -345,9 +347,12 @@ class BrokerTest extends MesosTestCase { broker.task = new Task("1", "slave", "executor", "host") val read: Broker = new Broker() - read.fromJson(Util.parseJson("" + broker.toJson)) - + read.fromJson(Util.parseJson("" + broker.toJson())) BrokerTest.assertBrokerEquals(broker, read) + + val readExpanded: Broker = new Broker() + readExpanded.fromJson(Util.parseJson("" + broker.toJson(expanded = true)), expanded = true) + BrokerTest.assertBrokerEquals(broker, readExpanded) } // static part @@ -531,6 +536,7 @@ object BrokerTest { def assertBrokerEquals(expected: Broker, actual: Broker) { if (checkNulls(expected, actual)) return + assertEquals(expected.cluster.id, actual.cluster.id) assertEquals(expected.id, actual.id) assertEquals(expected.active, actual.active) diff --git a/src/test/ly/stealth/mesos/kafka/CliTest.scala b/src/test/ly/stealth/mesos/kafka/CliTest.scala index 7a4e693..8234af0 100644 --- a/src/test/ly/stealth/mesos/kafka/CliTest.scala +++ b/src/test/ly/stealth/mesos/kafka/CliTest.scala @@ -64,9 +64,9 @@ class CliTest extends MesosTestCase { @Test def broker_list{ - Scheduler.cluster.addBroker(new Broker("0")) - Scheduler.cluster.addBroker(new Broker("1")) - Scheduler.cluster.addBroker(new Broker("2")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) + Nodes.addBroker(new Broker("2", testCluster)) exec("broker list") assertOutContains("brokers:") @@ -75,7 +75,7 @@ class CliTest extends MesosTestCase { assertOutContains("id: 2") // when broker needs restart - val broker = Scheduler.cluster.getBroker("0") + val broker = Nodes.getBroker("0") broker.needsRestart = true exec("broker list") assertOutContains("(modified, needs restart)") @@ -83,20 +83,20 @@ class CliTest extends MesosTestCase { @Test def broker_add { - exec("broker add 0 --cpus=0.1 --mem=128") + exec("broker add 0 --cpus=0.1 --mem=128 --cluster default") assertOutContains("broker added:") assertOutContains("id: 0") assertOutContains("cpus:0.10, mem:128") - assertEquals(1, Scheduler.cluster.getBrokers.size()) - val broker = Scheduler.cluster.getBroker("0") + assertEquals(1, Nodes.getBrokers.size) + val broker = Nodes.getBroker("0") assertEquals(0.1, broker.cpus, 0.001) assertEquals(128, broker.mem) } @Test def broker_update { - val broker = Scheduler.cluster.addBroker(new Broker("0")) + val broker = Nodes.addBroker(new Broker("0", testCluster)) exec("broker update 0 --failover-delay=10s --failover-max-delay=20s --options=log.dirs=/tmp/kafka-logs") assertOutContains("broker updated:") @@ -110,17 +110,17 @@ class CliTest extends MesosTestCase { @Test def broker_remove { - Scheduler.cluster.addBroker(new Broker("0")) + Nodes.addBroker(new Broker("0", testCluster)) exec("broker remove 0") assertOutContains("broker 0 removed") - assertNull(Scheduler.cluster.getBroker("0")) + assertNull(Nodes.getBroker("0")) } @Test def broker_start_stop { - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker0 = Nodes.addBroker(new Broker("0", testCluster)) + val broker1 = Nodes.addBroker(new Broker("1", testCluster)) exec("broker start * --timeout=0") assertOutContains("brokers scheduled to start:") @@ -144,7 +144,7 @@ class CliTest extends MesosTestCase { @Test def broker_start_stop_timeout { - val broker = Scheduler.cluster.addBroker(new Broker("0")) + val broker = Nodes.addBroker(new Broker("0", testCluster)) try { exec("broker start 0 --timeout=1ms"); fail() } catch { case e: Cli.Error => assertTrue(e.getMessage, e.getMessage.contains("broker start timeout")) } assertTrue(broker.active) @@ -161,7 +161,7 @@ class CliTest extends MesosTestCase { assertCliErrorContains("broker log 0", "broker 0 not found") // broker isn't active or running - val broker = Scheduler.cluster.addBroker(new Broker("0")) + val broker = Nodes.addBroker(new Broker("0", testCluster)) assertCliErrorContains("broker log 0", "broker 0 is not active") broker.active = true @@ -227,8 +227,8 @@ class CliTest extends MesosTestCase { assertOutContains("Usage: broker restart [options]") assertOutContains("--timeout") - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker0 = Nodes.addBroker(new Broker("0", testCluster)) + val broker1 = Nodes.addBroker(new Broker("1", testCluster)) def started(broker: Broker) { Scheduler.resourceOffers(schedulerDriver, Seq(offer(resources = "cpus:2.0;mem:2048;ports:9042..65000", hostname = "slave" + broker.id))) @@ -243,7 +243,7 @@ class CliTest extends MesosTestCase { assertNull(broker.task) } - for(broker <- Scheduler.cluster.getBrokers) { + for(broker <- Nodes.getBrokers) { exec("broker start " + broker.id + " --timeout 0s") started(broker) } @@ -267,22 +267,22 @@ class CliTest extends MesosTestCase { @Test def topic_list { - exec("topic list") + exec("topic list --cluster default") assertOutContains("no topics") - Scheduler.cluster.topics.addTopic("t0") - Scheduler.cluster.topics.addTopic("t1") - Scheduler.cluster.topics.addTopic("x") + testCluster.topics.addTopic("t0") + testCluster.topics.addTopic("t1") + testCluster.topics.addTopic("x") // list all - exec("topic list") + exec("topic list --cluster default") assertOutContains("topics:") assertOutContains("t0") assertOutContains("t1") assertOutContains("x") // name filtering - exec("topic list t*") + exec("topic list t* --cluster default") assertOutContains("t0") assertOutContains("t1") assertOutNotContains("x") @@ -290,17 +290,17 @@ class CliTest extends MesosTestCase { @Test def topic_add { - exec("topic add t0") + exec("topic add t0 --cluster default") assertOutContains("topic added:") assertOutContains("name: t0") - exec("topic list") + exec("topic list --cluster default") assertOutContains("topic:") assertOutContains("name: t0") assertOutContains("partitions: 0:[0]") - exec("topic add t1 --partition 2") - exec("topic list t1") + exec("topic add t1 --partition 2 --cluster default") + exec("topic list t1 --cluster default") assertOutContains("topic:") assertOutContains("name: t1") assertOutContains("partitions: 0:[0], 1:[0]") @@ -308,12 +308,12 @@ class CliTest extends MesosTestCase { @Test def topic_update { - Scheduler.cluster.topics.addTopic("t0") - exec("topic update t0 --options=flush.ms=5000") + testCluster.topics.addTopic("t0") + exec("topic update t0 --options=flush.ms=5000 --cluster default") assertOutContains("topic updated:") assertOutContains("name: t0") - exec("topic list") + exec("topic list --cluster default") assertOutContains("topic:") assertOutContains("t0") assertOutContains("flush.ms=5000") @@ -321,15 +321,14 @@ class CliTest extends MesosTestCase { @Test def topic_rebalance { - val cluster: Cluster = Scheduler.cluster - val rebalancer: Rebalancer = cluster.rebalancer + val rebalancer: Rebalancer = testCluster.rebalancer - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) assertFalse(rebalancer.running) - cluster.topics.addTopic("t") - exec("topic rebalance *") + testCluster.topics.addTopic("t") + exec("topic rebalance * --cluster default") assertTrue(rebalancer.running) assertOutContains("Rebalance started") } @@ -360,6 +359,55 @@ class CliTest extends MesosTestCase { } } + @Test + def cluster_list{ + Nodes.addCluster(new Cluster("c1")) + Nodes.addCluster(new Cluster("c2")) + Nodes.addCluster(new Cluster("c3")) + + exec("cluster list") + assertOutContains("clusters:") + assertOutContains("id: c1") + assertOutContains("id: c2") + assertOutContains("id: c3") + } + + @Test + def cluster_add { + exec("cluster add my_cluster --zk-connect zk://master:2181/kafka_1") + assertOutContains("cluster added:") + assertOutContains("id: my_cluster") + assertOutContains("zk connection string: zk://master:2181/kafka_1") + + assertEquals(2, Nodes.getClusters.size) + val cluster = Nodes.getCluster("my_cluster") + assertEquals("zk://master:2181/kafka_1", cluster.zkConnect) + } + + @Test + def cluster_update { + val cluster = new Cluster("my_cluster") + cluster.zkConnect = "zk://master:2181/k1" + Nodes.addCluster(cluster) + + exec(s"cluster update my_cluster --zk-connect zk://host:port/k1") + assertOutContains("cluster updated:") + assertOutContains("zk connection string: zk://host:port/k1") + + assertEquals("zk://host:port/k1", cluster.zkConnect) + } + + @Test + def cluster_remove { + val cluster = new Cluster("my_cluster") + cluster.zkConnect = "zk://master:2181/k1" + Nodes.addCluster(cluster) + exec("cluster remove my_cluster") + + assertOutContains("cluster my_cluster removed") + assertNull(Nodes.getCluster("my_cluster")) + } + private def assertOutContains(s: String): Unit = assertTrue("" + out, out.toString.contains(s)) private def assertOutNotContains(s: String): Unit = assertFalse("" + out, out.toString.contains(s)) diff --git a/src/test/ly/stealth/mesos/kafka/ClusterTest.scala b/src/test/ly/stealth/mesos/kafka/ClusterTest.scala index 10f653c..19df770 100644 --- a/src/test/ly/stealth/mesos/kafka/ClusterTest.scala +++ b/src/test/ly/stealth/mesos/kafka/ClusterTest.scala @@ -18,65 +18,57 @@ package ly.stealth.mesos.kafka import org.junit.{Before, Test} -import java.util import org.junit.Assert._ -import ly.stealth.mesos.kafka.Broker.State class ClusterTest extends MesosTestCase { - var cluster: Cluster = new Cluster() - @Before override def before { super.before - cluster.clear() + Nodes.clear() } @Test - def addBroker_removeBroker_getBrokers { - assertTrue(cluster.getBrokers.isEmpty) + def addCluster_removeCluster { + assertTrue(Nodes.getBrokers.isEmpty) - val broker0 = cluster.addBroker(new Broker("0")) - val broker1 = cluster.addBroker(new Broker("1")) - assertEquals(util.Arrays.asList(broker0, broker1), cluster.getBrokers) + val cluster0 = Nodes.addCluster(new Cluster("0")) + val cluster1 = Nodes.addCluster(new Cluster("1")) + assertEquals(Set(cluster0, cluster1), Nodes.getClusters.toSet) - cluster.removeBroker(broker0) - assertEquals(util.Arrays.asList(broker1), cluster.getBrokers) + Nodes.removeCluster(cluster0) + assertEquals(Set(cluster1), Nodes.getClusters.toSet) - cluster.removeBroker(broker1) - assertTrue(cluster.getBrokers.isEmpty) + Nodes.removeCluster(cluster1) + assertTrue(Nodes.getClusters.isEmpty) } @Test - def getBroker { - assertNull(cluster.getBroker("0")) + def getCluster { + assertNull(Nodes.getCluster("0")) - val broker0 = cluster.addBroker(new Broker("0")) - assertSame(broker0, cluster.getBroker("0")) + val broker0 = Nodes.addCluster(new Cluster("0")) + assertSame(broker0, Nodes.getCluster("0")) } @Test def save_load { - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) - cluster.save() + Nodes.addCluster(new Cluster("0")) + Nodes.addCluster(new Cluster("1")) + Nodes.save() - val read = new Cluster() - read.load() - assertEquals(2, read.getBrokers.size()) + Nodes.load() + assertEquals(2, Nodes.getClusters.size) } @Test def toJson_fromJson { - val broker0 = cluster.addBroker(new Broker("0")) - broker0.task = new Broker.Task("1", "slave", "executor", "host", _state = State.RUNNING) - cluster.addBroker(new Broker("1")) - cluster.frameworkId = "id" + val cluster = Nodes.addCluster(new Cluster("0")) + cluster.zkConnect = "zk://master:2181" - val read = new Cluster() + val read: Cluster = new Cluster() read.fromJson(Util.parseJson("" + cluster.toJson)) - assertEquals(cluster.frameworkId, read.frameworkId) - assertEquals(2, read.getBrokers.size()) - BrokerTest.assertBrokerEquals(broker0, read.getBroker("0")) + assertEquals(cluster.id, read.id) + assertEquals(cluster.zkConnect, read.zkConnect) } } diff --git a/src/test/ly/stealth/mesos/kafka/ExprTest.scala b/src/test/ly/stealth/mesos/kafka/ExprTest.scala index 1fb8d7a..fa5114d 100644 --- a/src/test/ly/stealth/mesos/kafka/ExprTest.scala +++ b/src/test/ly/stealth/mesos/kafka/ExprTest.scala @@ -19,73 +19,69 @@ class ExprTest extends MesosTestCase { @Test def expandBrokers { - val cluster = Scheduler.cluster - for (i <- 0 until 5) - cluster.addBroker(new Broker("" + i)) + Nodes.addBroker(new Broker("" + i, testCluster)) try { - assertEquals(util.Arrays.asList(), Expr.expandBrokers(cluster, "")) + assertEquals(util.Arrays.asList(), Expr.expandBrokers("")) fail() } catch { case e: IllegalArgumentException => } - assertEquals(util.Arrays.asList("0"), Expr.expandBrokers(cluster, "0")) - assertEquals(util.Arrays.asList("0", "2", "4"), Expr.expandBrokers(cluster, "0,2,4")) + assertEquals(util.Arrays.asList("0"), Expr.expandBrokers("0")) + assertEquals(util.Arrays.asList("0", "2", "4"), Expr.expandBrokers("0,2,4")) - assertEquals(util.Arrays.asList("1", "2", "3"), Expr.expandBrokers(cluster, "1..3")) - assertEquals(util.Arrays.asList("0", "1", "3", "4"), Expr.expandBrokers(cluster, "0..1,3..4")) + assertEquals(util.Arrays.asList("1", "2", "3"), Expr.expandBrokers("1..3")) + assertEquals(util.Arrays.asList("0", "1", "3", "4"), Expr.expandBrokers("0..1,3..4")) - assertEquals(util.Arrays.asList("0", "1", "2", "3", "4"), Expr.expandBrokers(cluster, "*")) + assertEquals(util.Arrays.asList("0", "1", "2", "3", "4"), Expr.expandBrokers("*")) // duplicates - assertEquals(util.Arrays.asList("0", "1", "2", "3", "4"), Expr.expandBrokers(cluster, "0..3,2..4")) + assertEquals(util.Arrays.asList("0", "1", "2", "3", "4"), Expr.expandBrokers("0..3,2..4")) // sorting - assertEquals(util.Arrays.asList("2", "3", "4"), Expr.expandBrokers(cluster, "4,3,2")) + assertEquals(util.Arrays.asList("2", "3", "4"), Expr.expandBrokers( "4,3,2")) // not-existent brokers - assertEquals(util.Arrays.asList("5", "6", "7"), Expr.expandBrokers(cluster, "5,6,7")) + assertEquals(util.Arrays.asList("5", "6", "7"), Expr.expandBrokers("5,6,7")) } @Test def expandBrokers_attributes { - val cluster = Scheduler.cluster - val b0 = cluster.addBroker(new Broker("0")) - val b1 = cluster.addBroker(new Broker("1")) - val b2 = cluster.addBroker(new Broker("2")) - cluster.addBroker(new Broker("3")) + val b0 = Nodes.addBroker(new Broker("0", testCluster)) + val b1 = Nodes.addBroker(new Broker("1", testCluster)) + val b2 = Nodes.addBroker(new Broker("2", testCluster)) + Nodes.addBroker(new Broker("3", testCluster)) b0.task = new Broker.Task(_hostname = "master", _attributes = Util.parseMap("a=1")) b1.task = new Broker.Task(_hostname = "slave0", _attributes = Util.parseMap("a=2,b=2")) b2.task = new Broker.Task(_hostname = "slave1", _attributes = Util.parseMap("b=2")) // exact match - assertEquals(util.Arrays.asList("0", "1", "2", "3"), Expr.expandBrokers(cluster, "*")) - assertEquals(util.Arrays.asList("0"), Expr.expandBrokers(cluster, "*[a=1]")) - assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers(cluster, "*[b=2]")) + assertEquals(util.Arrays.asList("0", "1", "2", "3"), Expr.expandBrokers("*")) + assertEquals(util.Arrays.asList("0"), Expr.expandBrokers("*[a=1]")) + assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers("*[b=2]")) // attribute present - assertEquals(util.Arrays.asList("0", "1"), Expr.expandBrokers(cluster, "*[a]")) - assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers(cluster, "*[b]")) + assertEquals(util.Arrays.asList("0", "1"), Expr.expandBrokers("*[a]")) + assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers("*[b]")) // hostname - assertEquals(util.Arrays.asList("0"), Expr.expandBrokers(cluster, "*[hostname=master]")) - assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers(cluster, "*[hostname=slave*]")) + assertEquals(util.Arrays.asList("0"), Expr.expandBrokers("*[hostname=master]")) + assertEquals(util.Arrays.asList("1", "2"), Expr.expandBrokers("*[hostname=slave*]")) // not existent broker - assertEquals(util.Arrays.asList(), Expr.expandBrokers(cluster, "5[a]")) - assertEquals(util.Arrays.asList(), Expr.expandBrokers(cluster, "5[]")) + assertEquals(util.Arrays.asList(), Expr.expandBrokers("5[a]")) + assertEquals(util.Arrays.asList(), Expr.expandBrokers("5[]")) } @Test def expandBrokers_sortByAttrs { - val cluster = Scheduler.cluster - val b0 = cluster.addBroker(new Broker("0")) - val b1 = cluster.addBroker(new Broker("1")) - val b2 = cluster.addBroker(new Broker("2")) - val b3 = cluster.addBroker(new Broker("3")) - val b4 = cluster.addBroker(new Broker("4")) - val b5 = cluster.addBroker(new Broker("5")) + val b0 = Nodes.addBroker(new Broker("0", testCluster)) + val b1 = Nodes.addBroker(new Broker("1", testCluster)) + val b2 = Nodes.addBroker(new Broker("2", testCluster)) + val b3 = Nodes.addBroker(new Broker("3", testCluster)) + val b4 = Nodes.addBroker(new Broker("4", testCluster)) + val b5 = Nodes.addBroker(new Broker("5", testCluster)) b0.task = new Broker.Task(_attributes = Util.parseMap("r=2,a=1")) b1.task = new Broker.Task(_attributes = Util.parseMap("r=0,a=1")) @@ -94,28 +90,27 @@ class ExprTest extends MesosTestCase { b4.task = new Broker.Task(_attributes = Util.parseMap("r=0,a=2")) b5.task = new Broker.Task(_attributes = Util.parseMap("r=0,a=2")) - assertEquals(util.Arrays.asList("0", "1", "2", "3", "4", "5"), Expr.expandBrokers(cluster, "*", sortByAttrs = true)) - assertEquals(util.Arrays.asList("1", "2", "0", "4", "3", "5"), Expr.expandBrokers(cluster, "*[r]", sortByAttrs = true)) - assertEquals(util.Arrays.asList("1", "4", "2", "3", "0", "5"), Expr.expandBrokers(cluster, "*[r,a]", sortByAttrs = true)) + assertEquals(util.Arrays.asList("0", "1", "2", "3", "4", "5"), Expr.expandBrokers("*", sortByAttrs = true)) + assertEquals(util.Arrays.asList("1", "2", "0", "4", "3", "5"), Expr.expandBrokers("*[r]", sortByAttrs = true)) + assertEquals(util.Arrays.asList("1", "4", "2", "3", "0", "5"), Expr.expandBrokers("*[r,a]", sortByAttrs = true)) - assertEquals(util.Arrays.asList("1", "2", "0"), Expr.expandBrokers(cluster, "*[r=*,a=1]", sortByAttrs = true)) - assertEquals(util.Arrays.asList("4", "3", "5"), Expr.expandBrokers(cluster, "*[r,a=2]", sortByAttrs = true)) + assertEquals(util.Arrays.asList("1", "2", "0"), Expr.expandBrokers("*[r=*,a=1]", sortByAttrs = true)) + assertEquals(util.Arrays.asList("4", "3", "5"), Expr.expandBrokers("*[r,a=2]", sortByAttrs = true)) } @Test def expandTopics { - val cluster = Scheduler.cluster - val topics: Topics = cluster.topics + val topics: Topics = testCluster.topics topics.addTopic("t0") topics.addTopic("t1") topics.addTopic("x") - assertEquals(util.Arrays.asList(), Expr.expandTopics("")) - assertEquals(util.Arrays.asList("t5", "t6"), Expr.expandTopics("t5,t6")) - assertEquals(util.Arrays.asList("t0"), Expr.expandTopics("t0")) - assertEquals(util.Arrays.asList("t0", "t1"), Expr.expandTopics("t0, t1")) - assertEquals(util.Arrays.asList("t0", "t1", "x"), Expr.expandTopics("*")) - assertEquals(util.Arrays.asList("t0", "t1"), Expr.expandTopics("t*")) + assertEquals(util.Arrays.asList(), Expr.expandTopics("", testCluster)) + assertEquals(util.Arrays.asList("t5", "t6"), Expr.expandTopics("t5,t6", testCluster)) + assertEquals(util.Arrays.asList("t0"), Expr.expandTopics("t0", testCluster)) + assertEquals(util.Arrays.asList("t0", "t1"), Expr.expandTopics("t0, t1", testCluster)) + assertEquals(util.Arrays.asList("t0", "t1", "x"), Expr.expandTopics("*", testCluster)) + assertEquals(util.Arrays.asList("t0", "t1"), Expr.expandTopics("t*", testCluster)) } } diff --git a/src/test/ly/stealth/mesos/kafka/HttpServerTest.scala b/src/test/ly/stealth/mesos/kafka/HttpServerTest.scala index c4e267f..3f103aa 100644 --- a/src/test/ly/stealth/mesos/kafka/HttpServerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/HttpServerTest.scala @@ -47,15 +47,15 @@ class HttpServerTest extends MesosTestCase { @Test def broker_add { - val json = sendRequest("/broker/add", parseMap("broker=0,cpus=0.1,mem=128")) + val json = sendRequest("/broker/add", parseMap("broker=0,cpus=0.1,mem=128,cluster=default")) val brokerNodes = json("brokers").asInstanceOf[List[Map[String, Object]]] assertEquals(1, brokerNodes.size) val responseBroker = new Broker() - responseBroker.fromJson(brokerNodes(0)) + responseBroker.fromJson(brokerNodes.head, expanded = true) - assertEquals(1, Scheduler.cluster.getBrokers.size()) - val broker = Scheduler.cluster.getBrokers.get(0) + assertEquals(1, Nodes.getBrokers.size) + val broker = Nodes.getBrokers.get(0) assertEquals("0", broker.id) assertEquals(0.1, broker.cpus, 0.001) assertEquals(128, broker.mem) @@ -65,24 +65,24 @@ class HttpServerTest extends MesosTestCase { @Test def broker_add_range { - val json = sendRequest("/broker/add", parseMap("broker=0..4")) + val json = sendRequest("/broker/add", parseMap("broker=0..4,cluster=default")) val brokerNodes = json("brokers").asInstanceOf[List[Map[String, Object]]] assertEquals(5, brokerNodes.size) - assertEquals(5, Scheduler.cluster.getBrokers.size) + assertEquals(5, Nodes.getBrokers.size) } @Test def broker_update { - sendRequest("/broker/add", parseMap("broker=0")) + sendRequest("/broker/add", parseMap("broker=0,cluster=default")) var json = sendRequest("/broker/update", parseMap("broker=0,cpus=1,heap=128,failoverDelay=5s")) val brokerNodes = json("brokers").asInstanceOf[List[Map[String, Object]]] assertEquals(1, brokerNodes.size) val responseBroker = new Broker() - responseBroker.fromJson(brokerNodes(0)) + responseBroker.fromJson(brokerNodes.head, expanded = true) - val broker = Scheduler.cluster.getBroker("0") + val broker = Nodes.getBroker("0") assertEquals(1, broker.cpus, 0.001) assertEquals(128, broker.heap) assertEquals(new Period("5s"), broker.failover.delay) @@ -120,17 +120,16 @@ class HttpServerTest extends MesosTestCase { @Test def broker_list { - val cluster = Scheduler.cluster - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) - cluster.addBroker(new Broker("2")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) + Nodes.addBroker(new Broker("2", testCluster)) var json = sendRequest("/broker/list", parseMap(null)) var brokerNodes = json("brokers").asInstanceOf[List[Map[String, Object]]] assertEquals(3, brokerNodes.size) val broker = new Broker() - broker.fromJson(brokerNodes(0)) + broker.fromJson(brokerNodes.head, expanded = true) assertEquals("0", broker.id) // filtering @@ -141,26 +140,24 @@ class HttpServerTest extends MesosTestCase { @Test def broker_remove { - val cluster = Scheduler.cluster - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) - cluster.addBroker(new Broker("2")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) + Nodes.addBroker(new Broker("2", testCluster)) var json = sendRequest("/broker/remove", parseMap("broker=1")) assertEquals("1", json("ids")) - assertEquals(2, cluster.getBrokers.size) - assertNull(cluster.getBroker("1")) + assertEquals(2, Nodes.getBrokers.size) + assertNull(Nodes.getBroker("1")) json = sendRequest("/broker/remove", parseMap("broker=*")) assertEquals("0,2", json("ids")) - assertTrue(cluster.getBrokers.isEmpty) + assertTrue(Nodes.getBrokers.isEmpty) } @Test def broker_start_stop { - val cluster = Scheduler.cluster - val broker0 = cluster.addBroker(new Broker("0")) - val broker1 = cluster.addBroker(new Broker("1")) + val broker0 = Nodes.addBroker(new Broker("0", testCluster)) + val broker1 = Nodes.addBroker(new Broker("1", testCluster)) var json = sendRequest("/broker/start", parseMap("broker=*,timeout=0s")) assertEquals(2, json("brokers").asInstanceOf[List[Map[String, Object]]].size) @@ -189,8 +186,8 @@ class HttpServerTest extends MesosTestCase { assertErrorContains("broker=0,timeout=0s", "broker 0 not found") - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker0 = Nodes.addBroker(new Broker("0", testCluster)) + val broker1 = Nodes.addBroker(new Broker("1", testCluster)) assertErrorContains("broker=0,timeout=0s", "broker 0 is not running") @@ -253,20 +250,20 @@ class HttpServerTest extends MesosTestCase { assertEquals(json("status"), "restarted") for((brokerJson, expectedBroker) <- json("brokers").asInstanceOf[List[Map[String, Object]]].zip(Seq(broker0, broker1))) { val actualBroker = new Broker() - actualBroker.fromJson(brokerJson) + actualBroker.fromJson(brokerJson, expanded = true) BrokerTest.assertBrokerEquals(expectedBroker, actualBroker) } } @Test def topic_list { - var json = sendRequest("/topic/list", parseMap("")) + var json = sendRequest("/topic/list", parseMap("cluster=default")) assertTrue(json("topics").asInstanceOf[List[Map[String, Object]]].isEmpty) - Scheduler.cluster.topics.addTopic("t0") - Scheduler.cluster.topics.addTopic("t1") + testCluster.topics.addTopic("t0") + testCluster.topics.addTopic("t1") - json = sendRequest("/topic/list", parseMap("")) + json = sendRequest("/topic/list", parseMap("cluster=default")) val topicNodes: List[Map[String, Object]] = json("topics").asInstanceOf[List[Map[String, Object]]] assertEquals(2, topicNodes.size) @@ -277,10 +274,10 @@ class HttpServerTest extends MesosTestCase { @Test def topic_add { - val topics = Scheduler.cluster.topics + val topics = testCluster.topics // add t0 topic - var json = sendRequest("/topic/add", parseMap("topic=t0")) + var json = sendRequest("/topic/add", parseMap("topic=t0,cluster=default")) val t0Node = json("topics").asInstanceOf[List[Map[String, Object]]](0) assertEquals("t0", t0Node("name")) assertEquals(Map("0" -> "0"), t0Node("partitions")) @@ -288,7 +285,7 @@ class HttpServerTest extends MesosTestCase { assertEquals("t0", topics.getTopic("t0").name) // add t1 topic - json = sendRequest("/topic/add", parseMap("topic=t1,partitions=2,options=flush.ms\\=1000")) + json = sendRequest("/topic/add", parseMap("topic=t1,partitions=2,options=flush.ms\\=1000,cluster=default")) val topicNode = json("topics").asInstanceOf[List[Map[String, Object]]](0) assertEquals("t1", topicNode("name")) @@ -304,11 +301,11 @@ class HttpServerTest extends MesosTestCase { @Test def topic_update { - val topics = Scheduler.cluster.topics + val topics = testCluster.topics topics.addTopic("t") // update topic t - val json = sendRequest("/topic/update", parseMap("topic=t,options=flush.ms\\=1000")) + val json = sendRequest("/topic/update", parseMap("topic=t,options=flush.ms\\=1000,cluster=default")) val topicNode = json("topics").asInstanceOf[List[Map[String, Object]]](0) assertEquals("t", topicNode("name")) @@ -319,15 +316,14 @@ class HttpServerTest extends MesosTestCase { @Test def topic_rebalance { - val cluster = Scheduler.cluster - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) - val rebalancer: TestRebalancer = cluster.rebalancer.asInstanceOf[TestRebalancer] + val rebalancer: TestRebalancer = testCluster.rebalancer.asInstanceOf[TestRebalancer] assertFalse(rebalancer.running) - cluster.topics.addTopic("t") - val json = sendRequest("/topic/rebalance", parseMap("topic=*")) + testCluster.topics.addTopic("t") + val json = sendRequest("/topic/rebalance", parseMap("topic=*,cluster=default")) assertTrue(rebalancer.running) assertEquals("started", json("status")) @@ -335,6 +331,51 @@ class HttpServerTest extends MesosTestCase { assertEquals(rebalancer.state, json("state").asInstanceOf[String]) } + @Test + def cluster_list { + Nodes.addCluster(new Cluster("c1")) + Nodes.addCluster(new Cluster("c2")) + + val json = sendRequest("/cluster/list", parseMap(null)) + val clusters = json("clusters").asInstanceOf[List[Map[String, Any]]].map(j => new Cluster(j)) + assertEquals(3, clusters.size) + + assertTrue(clusters.exists(_.id == "c1")) + assertTrue(clusters.exists(_.id == "c2")) + } + + @Test + def cluster_add { + val json = sendRequest("/cluster/add", parseMap("cluster=c1,zkConnect=192.168.0.1:2181/kafka1")) + val clusterNodes = json("clusters").asInstanceOf[List[Map[String, Object]]] + + assertEquals(1, clusterNodes.size) + val responseCluster = new Cluster(clusterNodes.head) + + assertEquals(2, Nodes.getClusters.size) + assertEquals("c1", responseCluster.id) + assertEquals("192.168.0.1:2181/kafka1", responseCluster.zkConnect) + } + + @Test + def cluster_remove { + val c1 = new Cluster("c1") + Nodes.addCluster(c1) + val c2 = new Cluster("c2") + Nodes.addCluster(new Cluster("c2")) + + val json = sendRequest("/cluster/remove", parseMap("cluster=c1")) + + assertEquals("c1", json("id").asInstanceOf[String]) + assertEquals(Set("c2", "default"), Nodes.getClusters.map(_.id).toSet) + + // cluster contains nodes + Nodes.addBroker(new Broker("0", c2)) + try { + sendRequest("/cluster/remove", parseMap("cluster=c2")); fail("") + } catch { case e: IOException => } + } + @Test def jar_download { val file = download("/jar/kafka-mesos.jar") diff --git a/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala b/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala index a86f928..42dfe92 100644 --- a/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala +++ b/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala @@ -31,7 +31,6 @@ import org.apache.log4j.BasicConfigurator import java.io.{FileWriter, File} import com.google.protobuf.ByteString import java.util.concurrent.atomic.AtomicBoolean -import ly.stealth.mesos.kafka.Cluster.FsStorage import org.I0Itec.zkclient.{ZkClient, IDefaultNameSpace, ZkServer} import java.net.ServerSocket import scala.concurrent.duration.Duration @@ -43,6 +42,7 @@ class MesosTestCase { var schedulerDriver: TestSchedulerDriver = null var executorDriver: TestExecutorDriver = null + var testCluster: Cluster = null @Before def before { @@ -51,18 +51,23 @@ class MesosTestCase { val storageFile = File.createTempFile(getClass.getSimpleName, null) storageFile.delete() - Cluster.storage = new FsStorage(storageFile) + Nodes.storage = new FsStorage(storageFile) Config.api = "http://localhost:7000" - Config.zk = "localhost" - Scheduler.cluster.clear() - Scheduler.cluster.rebalancer = new TestRebalancer() + Nodes.clear() + + testCluster = new Cluster("default") + testCluster.zkConnect = "localhost" + Nodes.addCluster(testCluster) + + testCluster.rebalancer = new TestRebalancer(() => testCluster.zkConnect) Scheduler.reconciles = 0 Scheduler.reconcileTime = null schedulerDriver = _schedulerDriver Scheduler.registered(schedulerDriver, frameworkId(), master()) + Scheduler.logs.clear() executorDriver = _executorDriver Executor.server = new TestBrokerServer() @@ -87,11 +92,11 @@ class MesosTestCase { def after { Scheduler.disconnected(schedulerDriver) - Scheduler.cluster.rebalancer = new Rebalancer() + testCluster.rebalancer = new TestRebalancer(() => "localhost") - val storage = Cluster.storage.asInstanceOf[FsStorage] + val storage = Nodes.storage.asInstanceOf[FsStorage] storage.file.delete() - Cluster.storage = new FsStorage(FsStorage.DEFAULT_FILE) + Nodes.storage = new FsStorage(FsStorage.DEFAULT_FILE) Executor.server.stop() Executor.server = new KafkaServer() @@ -100,7 +105,7 @@ class MesosTestCase { def startZkServer() { val port = findFreePort - Config.zk = s"localhost:$port" + testCluster.zkConnect = s"localhost:$port" zkDir = File.createTempFile(getClass.getName, null) zkDir.delete() @@ -297,7 +302,7 @@ class MesosTestCase { id: String = "" + UUID.randomUUID(), name: String = "Task", slaveId: String = "" + UUID.randomUUID(), - data: String = Util.formatMap(Collections.singletonMap("broker", new Broker().toJson)) + data: String = Util.formatMap(Collections.singletonMap("broker", new Broker("0", testCluster).toJson(expanded = true))) ): TaskInfo = { val builder = TaskInfo.newBuilder() .setName(id) @@ -493,7 +498,7 @@ class TestBrokerServer extends BrokerServer { def getClassLoader: ClassLoader = getClass.getClassLoader } -class TestRebalancer extends Rebalancer { +class TestRebalancer(zkConnect: () => String) extends Rebalancer(zkConnect) { var _running: Boolean = false var _failOnStart: Boolean = false diff --git a/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala b/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala index 2f414c1..c2ca07a 100644 --- a/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/RebalancerTest.scala @@ -25,20 +25,19 @@ import scala.collection.JavaConversions._ import java.util class RebalancerTest extends MesosTestCase { - var rebalancer: Rebalancer = null var zkClient: ZkClient = null @Before override def before { super.before - rebalancer = new Rebalancer() val port = 56789 - Config.zk = s"localhost:$port" + testCluster.zkConnect = s"localhost:$port" startZkServer() zkClient = zkServer.getZkClient zkClient.setZkSerializer(ZKStringSerializer) + testCluster.rebalancer = new Rebalancer(() => testCluster.zkConnect) } @After @@ -49,24 +48,23 @@ class RebalancerTest extends MesosTestCase { @Test def start { - val cluster = Scheduler.cluster - cluster.addBroker(new Broker("0")) - cluster.addBroker(new Broker("1")) + Nodes.addBroker(new Broker("0", testCluster)) + Nodes.addBroker(new Broker("1", testCluster)) - cluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0))) - assertFalse(rebalancer.running) - rebalancer.start(util.Arrays.asList("topic"), util.Arrays.asList("0", "1")) + testCluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0))) + assertFalse(testCluster.rebalancer.running) + testCluster.rebalancer.start(util.Arrays.asList("topic"), util.Arrays.asList("0", "1")) - assertTrue(rebalancer.running) - assertFalse(rebalancer.state.isEmpty) + assertTrue(testCluster.rebalancer.running) + assertFalse(testCluster.rebalancer.state.isEmpty) } @Test def start_in_progress { - Scheduler.cluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0))) + testCluster.topics.addTopic("topic", Map(0 -> util.Arrays.asList(0), 1 -> util.Arrays.asList(0))) ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, "") - try { rebalancer.start(util.Arrays.asList("t1"), util.Arrays.asList("0", "1")); fail() } + try { testCluster.rebalancer.start(util.Arrays.asList("t1"), util.Arrays.asList("0", "1")); fail() } catch { case e: Rebalancer.Exception => assertTrue(e.getMessage, e.getMessage.contains("in progress")) } } } diff --git a/src/test/ly/stealth/mesos/kafka/SchedulerTest.scala b/src/test/ly/stealth/mesos/kafka/SchedulerTest.scala index c26dcea..2a610f5 100644 --- a/src/test/ly/stealth/mesos/kafka/SchedulerTest.scala +++ b/src/test/ly/stealth/mesos/kafka/SchedulerTest.scala @@ -44,7 +44,7 @@ class SchedulerTest extends MesosTestCase { @Test def newTask { - val broker = new Broker("1") + val broker = new Broker("1", testCluster) broker.options = Util.parseMap("a=1") broker.log4jOptions = Util.parseMap("b=2") broker.cpus = 0.5 @@ -64,13 +64,13 @@ class SchedulerTest extends MesosTestCase { val data: util.Map[String, String] = Util.parseMap(task.getData.toStringUtf8) val readBroker: Broker = new Broker() - readBroker.fromJson(Util.parseJson(data.get("broker"))) + readBroker.fromJson(Util.parseJson(data.get("broker")), expanded = true) BrokerTest.assertBrokerEquals(broker, readBroker) val defaults = Util.parseMap(data.get("defaults")) assertEquals(broker.id, defaults.get("broker.id")) assertEquals("" + 1000, defaults.get("port")) - assertEquals(Config.zk, defaults.get("zookeeper.connect")) + assertEquals(testCluster.zkConnect, defaults.get("zookeeper.connect")) assertEquals("kafka-logs", defaults.get("log.dirs")) assertEquals(offer.getHostname, defaults.get("host.name")) @@ -78,7 +78,7 @@ class SchedulerTest extends MesosTestCase { @Test def syncBrokers { - val broker = Scheduler.cluster.addBroker(new Broker()) + val broker = Nodes.addBroker(new Broker("0", testCluster)) val offer = this.offer(resources = s"cpus:${broker.cpus}; mem:${broker.mem}; ports:1000") // broker !active @@ -101,7 +101,7 @@ class SchedulerTest extends MesosTestCase { @Test def acceptOffer { - val broker = Scheduler.cluster.addBroker(new Broker()) + val broker = Nodes.addBroker(new Broker("0", testCluster)) broker.active = true broker.task = new Broker.Task(_state = Broker.State.RECONCILING) @@ -119,7 +119,7 @@ class SchedulerTest extends MesosTestCase { @Test def onBrokerStatus { - val broker = Scheduler.cluster.addBroker(new Broker()) + val broker = Nodes.addBroker(new Broker("0", testCluster)) broker.task = new Broker.Task(Broker.nextTaskId(broker), "slave", "executor", "host") assertEquals(Broker.State.STARTING, broker.task.state) @@ -136,7 +136,7 @@ class SchedulerTest extends MesosTestCase { @Test def onBrokerStarted { - val broker = Scheduler.cluster.addBroker(new Broker()) + val broker = Nodes.addBroker(new Broker()) broker.task = new Broker.Task("task") assertEquals(Broker.State.STARTING, broker.task.state) @@ -147,7 +147,7 @@ class SchedulerTest extends MesosTestCase { @Test def onBrokerStopped { - val broker = Scheduler.cluster.addBroker(new Broker()) + val broker = Nodes.addBroker(new Broker()) val task = new Broker.Task("task", _state = Broker.State.RUNNING) // finished @@ -182,7 +182,7 @@ class SchedulerTest extends MesosTestCase { @Test def launchTask { - val broker = Scheduler.cluster.addBroker(new Broker("100")) + val broker = Nodes.addBroker(new Broker("100", testCluster)) val offer = this.offer(resources = s"cpus:${broker.cpus}; mem:${broker.mem}", attributes = "a=1,b=2") broker.needsRestart = true @@ -201,12 +201,12 @@ class SchedulerTest extends MesosTestCase { @Test def reconcileTasksIfRequired { Scheduler.reconcileTime = null - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) + val broker0 = Nodes.addBroker(new Broker("0")) - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker1 = Nodes.addBroker(new Broker("1")) broker1.task = new Broker.Task(_id = "1", _state = Broker.State.RUNNING) - val broker2 = Scheduler.cluster.addBroker(new Broker("2")) + val broker2 = Nodes.addBroker(new Broker("2")) broker2.task = new Broker.Task(_id = "2", _state = Broker.State.STARTING) Scheduler.reconcileTasksIfRequired(force = true, now = new Date(0)) @@ -232,10 +232,10 @@ class SchedulerTest extends MesosTestCase { @Test def otherTasksAttributes { - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) + val broker0 = Nodes.addBroker(new Broker("0")) broker0.task = new Broker.Task(_hostname = "host0", _attributes = Util.parseMap("a=1,b=2")) - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker1 = Nodes.addBroker(new Broker("1")) broker1.task = new Broker.Task(_hostname = "host1", _attributes = Util.parseMap("b=3")) assertArrayEquals(Array[AnyRef]("host0", "host1"), Scheduler.otherTasksAttributes("hostname").asInstanceOf[Array[AnyRef]]) @@ -245,9 +245,9 @@ class SchedulerTest extends MesosTestCase { @Test def onFrameworkMessage = { - val broker0 = Scheduler.cluster.addBroker(new Broker("0")) + val broker0 = Nodes.addBroker(new Broker("0")) broker0.active = true - val broker1 = Scheduler.cluster.addBroker(new Broker("1")) + val broker1 = Nodes.addBroker(new Broker("1")) broker1.active = true assertNull(broker0.metrics) @@ -292,7 +292,7 @@ class SchedulerTest extends MesosTestCase { @Test def sendReceiveBrokerLog = { - val broker = Scheduler.cluster.addBroker(new Broker("0")) + val broker = Nodes.addBroker(new Broker("0")) broker.task = new Broker.Task("task-id", "slave-id", "executor-id") // driver connected diff --git a/src/test/ly/stealth/mesos/kafka/TopicsTest.scala b/src/test/ly/stealth/mesos/kafka/TopicsTest.scala index eb7c50c..f7ae650 100644 --- a/src/test/ly/stealth/mesos/kafka/TopicsTest.scala +++ b/src/test/ly/stealth/mesos/kafka/TopicsTest.scala @@ -1,6 +1,6 @@ package ly.stealth.mesos.kafka -import org.junit.{After, Before, Test} +import org.junit.{BeforeClass, After, Before, Test} import org.junit.Assert._ import ly.stealth.mesos.kafka.Topics.Topic import java.util @@ -12,7 +12,7 @@ class TopicsTest extends MesosTestCase { override def before { super.before startZkServer() - topics = Scheduler.cluster.topics + topics = testCluster.topics } @After From 7b4da235d6fcd028baa34f4b2e763eced3465965 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Tue, 29 Mar 2016 17:46:56 +0300 Subject: [PATCH 2/5] MK-2 - minor post merge fixes --- kafka-mesos.properties | 2 +- .../ly/stealth/mesos/kafka/HttpServer.scala | 4 +-- .../ly/stealth/mesos/kafka/ExecutorTest.scala | 4 +-- .../mesos/kafka/KafkaMesosTestCase.scala | 26 +++++++++++-------- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/kafka-mesos.properties b/kafka-mesos.properties index 7e47542..b8c980c 100644 --- a/kafka-mesos.properties +++ b/kafka-mesos.properties @@ -3,7 +3,7 @@ debug=true user=vagrant -storage=zk://master:2181/chroot/mesos-kafka-scheduler +storage=master:2181/chroot/mesos-kafka-scheduler master=master:5050 diff --git a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala index 8184fc6..236c5a7 100644 --- a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala @@ -167,7 +167,7 @@ object HttpServer { return } - val errors = new util.ArrayList[String]() + val errors = new util.ArrayList[String] var cluster = Nodes.getCluster(id) if (add && cluster != null) errors.add("duplicate cluster") @@ -177,7 +177,7 @@ object HttpServer { errors.add("cluster has active nodes") if (add && zkConnect == null || zkConnect.trim.isEmpty) - errors.add(404, "cluster has active nodes") + errors.add("zk connection string is empty") if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return } diff --git a/src/test/ly/stealth/mesos/kafka/ExecutorTest.scala b/src/test/ly/stealth/mesos/kafka/ExecutorTest.scala index 8b06bb2..074ff8d 100644 --- a/src/test/ly/stealth/mesos/kafka/ExecutorTest.scala +++ b/src/test/ly/stealth/mesos/kafka/ExecutorTest.scala @@ -26,7 +26,7 @@ import net.elodina.mesos.util.Strings class ExecutorTest extends KafkaMesosTestCase { @Test(timeout = 5000) def startBroker_success { - val data: String = Strings.formatMap(util.Collections.singletonMap("broker", "" + new Broker().toJson)) + val data: String = Strings.formatMap(util.Collections.singletonMap("broker", "" + new Broker("0", testCluster).toJson(expanded = true))) Executor.startBroker(executorDriver, task("id", "task", "slave", data)) executorDriver.waitForStatusUpdates(1) assertEquals(1, executorDriver.statusUpdates.size()) @@ -73,7 +73,7 @@ class ExecutorTest extends KafkaMesosTestCase { @Test(timeout = 5000) def launchTask { - val data: String = Strings.formatMap(util.Collections.singletonMap("broker", "" + new Broker().toJson)) + val data: String = Strings.formatMap(util.Collections.singletonMap("broker", "" + new Broker("0", testCluster).toJson(expanded = true))) Executor.launchTask(executorDriver, task("id", "task", "slave", data)) executorDriver.waitForStatusUpdates(1) diff --git a/src/test/ly/stealth/mesos/kafka/KafkaMesosTestCase.scala b/src/test/ly/stealth/mesos/kafka/KafkaMesosTestCase.scala index 11558fa..2e8ccb1 100644 --- a/src/test/ly/stealth/mesos/kafka/KafkaMesosTestCase.scala +++ b/src/test/ly/stealth/mesos/kafka/KafkaMesosTestCase.scala @@ -1,9 +1,8 @@ package ly.stealth.mesos.kafka -import java.io.{FileWriter, File} +import java.io.File import org.I0Itec.zkclient.{ZkClient, IDefaultNameSpace, ZkServer} import org.apache.log4j.BasicConfigurator -import ly.stealth.mesos.kafka.Cluster.FsStorage import net.elodina.mesos.util.{IO, Net, Version} import org.junit.{Ignore, Before, After} import scala.concurrent.duration.Duration @@ -14,6 +13,7 @@ import java.util class KafkaMesosTestCase extends net.elodina.mesos.test.MesosTestCase { var zkDir: File = null var zkServer: ZkServer = null + var testCluster: Cluster = null @Before def before { @@ -21,13 +21,17 @@ class KafkaMesosTestCase extends net.elodina.mesos.test.MesosTestCase { val storageFile = File.createTempFile(getClass.getSimpleName, null) storageFile.delete() - Cluster.storage = new FsStorage(storageFile) + Nodes.storage = new FsStorage(storageFile) Config.api = "http://localhost:7000" - Config.zk = "localhost" - Scheduler.cluster.clear() - Scheduler.cluster.rebalancer = new TestRebalancer() + Nodes.clear() + + testCluster = new Cluster("default") + testCluster.zkConnect = "localhost" + Nodes.addCluster(testCluster) + + testCluster.rebalancer = new TestRebalancer(() => testCluster.zkConnect) Scheduler.reconciles = 0 Scheduler.reconcileTime = null @@ -51,11 +55,11 @@ class KafkaMesosTestCase extends net.elodina.mesos.test.MesosTestCase { def after { Scheduler.disconnected(schedulerDriver) - Scheduler.cluster.rebalancer = new Rebalancer() + testCluster.rebalancer = new TestRebalancer(() => "localhost") - val storage = Cluster.storage.asInstanceOf[FsStorage] + val storage = Nodes.storage.asInstanceOf[FsStorage] storage.file.delete() - Cluster.storage = new FsStorage(FsStorage.DEFAULT_FILE) + Nodes.storage = new FsStorage(FsStorage.DEFAULT_FILE) Executor.server.stop() Executor.server = new KafkaServer() @@ -64,7 +68,7 @@ class KafkaMesosTestCase extends net.elodina.mesos.test.MesosTestCase { def startZkServer() { val port = Net.findAvailPort - Config.zk = s"localhost:$port" + testCluster.zkConnect = s"localhost:$port" zkDir = File.createTempFile(getClass.getName, null) zkDir.delete() @@ -138,7 +142,7 @@ class TestBrokerServer extends BrokerServer { def getClassLoader: ClassLoader = getClass.getClassLoader } -class TestRebalancer extends Rebalancer { +class TestRebalancer(zkConnect: () => String) extends Rebalancer(zkConnect) { var _running: Boolean = false var _failOnStart: Boolean = false From 5ae21794b987a502ae625de8b62ef1bb3b6b40e1 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Tue, 29 Mar 2016 17:59:06 +0300 Subject: [PATCH 3/5] MK-2 - cosmetic CLI opts fix --- src/scala/ly/stealth/mesos/kafka/Cli.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/scala/ly/stealth/mesos/kafka/Cli.scala b/src/scala/ly/stealth/mesos/kafka/Cli.scala index b92e792..b12a8e7 100644 --- a/src/scala/ly/stealth/mesos/kafka/Cli.scala +++ b/src/scala/ly/stealth/mesos/kafka/Cli.scala @@ -450,7 +450,7 @@ object Cli { private def handleAddUpdate(expr: String, args: Array[String], add: Boolean, help: Boolean = false): Unit = { val parser = newParser() - parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "cluster id").withRequiredArg().ofType(classOf[String]) parser.accepts("cpus", "cpu amount (0.5, 1, 2)").withRequiredArg().ofType(classOf[java.lang.Double]) parser.accepts("mem", "mem amount in Mb").withRequiredArg().ofType(classOf[java.lang.Long]) @@ -865,7 +865,7 @@ object Cli { def handleList(expr: String, args: Array[String], help: Boolean = false): Unit = { val parser = newParser() - parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "cluster id").withRequiredArg().ofType(classOf[String]) if (help) { printLine("List topics\nUsage: topic list [] --cluster \n") @@ -919,7 +919,7 @@ object Cli { parser.accepts("replicas", "replicas count. Default - 1").withRequiredArg().ofType(classOf[Integer]) } parser.accepts("options", "topic options. Example: flush.ms=60000,retention.ms=6000000").withRequiredArg().ofType(classOf[String]) - parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "cluster id").withRequiredArg().ofType(classOf[String]) if (help) { printLine(s"${cmd.capitalize} topic\nUsage: topic $cmd [options]\n") @@ -986,7 +986,7 @@ object Cli { parser.accepts("broker", ". Default - *. See below.").withRequiredArg().ofType(classOf[String]) parser.accepts("replicas", "replicas count. Default - 1").withRequiredArg().ofType(classOf[Integer]) parser.accepts("timeout", "timeout (30s, 1m, 1h). 0s - no timeout").withRequiredArg().ofType(classOf[String]) - parser.accepts("cluster", "Cluster id").withRequiredArg().ofType(classOf[String]) + parser.accepts("cluster", "cluster id").withRequiredArg().ofType(classOf[String]) if (help) { printLine("Rebalance topics\nUsage: topic rebalance |status [options]\n") From 7bd21e600d3c41cc98754a7f776e221ec7efc008 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Fri, 1 Apr 2016 18:31:15 +0300 Subject: [PATCH 4/5] MK-2 - added readme --- README.md | 99 +++++++++++++++++-- src/scala/ly/stealth/mesos/kafka/Cli.scala | 3 + .../ly/stealth/mesos/kafka/ClusterCli.scala | 12 ++- 3 files changed, 105 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index d8c2fb5..2ef2e12 100644 --- a/README.md +++ b/README.md @@ -130,8 +130,24 @@ Starting and using 1 broker First let's start up and use 1 broker with the default settings. Further in the readme you can see how to change these from the defaults. +Each broker should be part of the cluster. Adding clusters will let you logically separate Kafka nodes. A Kafka-mesos cluster has a unique id +(specified on cluster creation) and Zookeeper connection string - a Zookeeper url (with optional chroot an the end) to store all Kafka metadata. +Zookeeper connection string specified on cluster level will be inherited by brokers as zk.connect config setting. + +To add a cluster execute: + +``` +# ./kafka-mesos.sh cluster add kafka-cluster-1 --zk-connect master:2181/kafka-1 +cluster added: + id: kafka-cluster-1 + zk connection string: master:2181/kafka-1 +``` +(You can also update and delete clusters, follow respective examples as with `broker` command line interface) + +Now you can add a broker: + ``` -# ./kafka-mesos.sh broker add 0 +# ./kafka-mesos.sh broker add 0 --cluster kafka-cluster-1 broker added: id: 0 active: false @@ -305,10 +321,10 @@ current limit is 100Kb no matter how many lines being requested. High Availability Scheduler State ------------------------- -The scheduler supports storing the cluster state in Zookeeper. It currently shares a znode within the mesos ensemble. To turn this on in properties +The scheduler supports storing the cluster state in Zookeeper. To turn this on in properties ``` -clusterStorage=zk:/kafka-mesos +clusterStorage=zk:master:2181/kafka-mesos ``` Failed Broker Recovery @@ -521,6 +537,75 @@ Error: broker 1 timeout on start Navigating the CLI ================== +Adding a cluster +------------------------------- + +``` +# ./kafka-mesos.sh help cluster add +Add cluster +Usage: cluster add [options] + +Option Description +------ ----------- +--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2: + 2181/kafka1 + +Generic Options +Option Description +------ ----------- +--api Api url. Example: http://master:7000 +``` + +Updating a cluster +------------------------------- + +``` +# ./kafka-mesos.sh help cluster update +Update cluster +Usage: cluster update [options] + +Option Description +------ ----------- +--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2: + 2181/kafka1 + +Generic Options +Option Description +------ ----------- +--api Api url. Example: http://master:7000 +``` + +Removing a cluster +------------------------------- + +``` +# ./kafka-mesos.sh help cluster remove +Remove cluster +Usage: cluster remove + +Generic Options +Option Description +------ ----------- +--api Api url. Example: http://master:7000 + +``` + +Listing clusters +------------------------------- + +``` +# ./kafka-mesos.sh help cluster list +List brokers +Usage: cluster list + +Generic Options +Option Description +------ ----------- +--api Api url. Example: http://master:7000 + +``` + + Adding brokers to the cluster ------------------------------- @@ -774,7 +859,7 @@ Listing Topics ``` #./kafka-mesos.sh help topic list List topics -Usage: topic list [] +Usage: topic list [] --cluster Generic Options Option Description @@ -793,7 +878,7 @@ Adding Topic ``` #./kafka-mesos.sh help topic add Add topic -Usage: topic add [options] +Usage: topic add [options] --cluster Option Description ------ ----------- @@ -830,7 +915,7 @@ Updating Topic ``` #./kafka-mesos.sh help topic update Update topic -Usage: topic update [options] +Usage: topic update [options] --cluster Option Description ------ ----------- @@ -853,7 +938,7 @@ Rebalancing topics ``` #./kafka-mesos.sh help topic rebalance Rebalance topics -Usage: topic rebalance |status [options] +Usage: topic rebalance |status [options] --cluster Option Description ------ ----------- diff --git a/src/scala/ly/stealth/mesos/kafka/Cli.scala b/src/scala/ly/stealth/mesos/kafka/Cli.scala index b12a8e7..fedfcd0 100644 --- a/src/scala/ly/stealth/mesos/kafka/Cli.scala +++ b/src/scala/ly/stealth/mesos/kafka/Cli.scala @@ -90,6 +90,8 @@ object Cli { BrokerCli.handle(subCmd, null, help = true) case "topic" => TopicCli.handle(subCmd, null, help = true) + case "cluster" => + ClusterCli.handle(subCmd, null, help = true) case _ => throw new Error(s"unsupported command $cmd") } @@ -144,6 +146,7 @@ object Cli { printLine("Commands:") printLine("help [cmd [cmd]] - print general or command-specific help", 1) if (SchedulerCli.isEnabled) printLine("scheduler - start scheduler", 1) + printLine("cluster - cluster management commands", 1) printLine("broker - broker management commands", 1) printLine("topic - topic management commands", 1) } diff --git a/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala b/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala index c2fba19..4ba5e4e 100644 --- a/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala +++ b/src/scala/ly/stealth/mesos/kafka/ClusterCli.scala @@ -34,11 +34,19 @@ object ClusterCli { } } + private[kafka] def printCmds(): Unit = { + printLine("Commands:") + printLine("add - add cluster", 1) + printLine("update - update cluster configuration", 1) + printLine("remove - remove cluster", 1) + printLine("list - list existing clusters", 1) + } + private def handleHelp(cmd: String): Unit = { cmd match { case null => printLine("Cluster management commands\nUsage: cluster \n") - printCmds() + ClusterCli.printCmds() printLine() printLine("Run `help cluster ` to see details of specific command") @@ -55,7 +63,7 @@ object ClusterCli { private def handleList(expr: String, help: Boolean = false): Unit = { if (help) { - printLine("List brokers\nUsage: cluster list\n") + printLine("List clusters\nUsage: cluster list\n") handleGenericOptions(null, help = true) printLine() From c33f79e9a374017d7c64dba2a75136004931c32f Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Tue, 5 Apr 2016 23:57:27 +0300 Subject: [PATCH 5/5] MK-2 - minor fixes to zk storage type and add/update cluster logic --- kafka-mesos.properties | 2 +- src/scala/ly/stealth/mesos/kafka/HttpServer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-mesos.properties b/kafka-mesos.properties index b8c980c..45c2748 100644 --- a/kafka-mesos.properties +++ b/kafka-mesos.properties @@ -3,7 +3,7 @@ debug=true user=vagrant -storage=master:2181/chroot/mesos-kafka-scheduler +storage=zk:master:2181 master=master:5050 diff --git a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala index 236c5a7..ebbb201 100644 --- a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala @@ -176,7 +176,7 @@ object HttpServer { if (!add && cluster.active) errors.add("cluster has active nodes") - if (add && zkConnect == null || zkConnect.trim.isEmpty) + if (add && (zkConnect == null || zkConnect.trim.isEmpty)) errors.add("zk connection string is empty") if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return }