Skip to content

MK-2 cluster support #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 92 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,24 @@ Starting and using 1 broker

First let's start up and use 1 broker with the default settings. Further in the readme you can see how to change these from the defaults.

Each broker should be part of the cluster. Adding clusters will let you logically separate Kafka nodes. A Kafka-mesos cluster has a unique id
(specified on cluster creation) and Zookeeper connection string - a Zookeeper url (with optional chroot an the end) to store all Kafka metadata.
Zookeeper connection string specified on cluster level will be inherited by brokers as zk.connect config setting.

To add a cluster execute:

```
# ./kafka-mesos.sh cluster add kafka-cluster-1 --zk-connect master:2181/kafka-1
cluster added:
id: kafka-cluster-1
zk connection string: master:2181/kafka-1
```
(You can also update and delete clusters, follow respective examples as with `broker` command line interface)

Now you can add a broker:

```
# ./kafka-mesos.sh broker add 0
# ./kafka-mesos.sh broker add 0 --cluster kafka-cluster-1
broker added:
id: 0
active: false
Expand Down Expand Up @@ -305,10 +321,10 @@ current limit is 100Kb no matter how many lines being requested.

High Availability Scheduler State
-------------------------
The scheduler supports storing the cluster state in Zookeeper. It currently shares a znode within the mesos ensemble. To turn this on in properties
The scheduler supports storing the cluster state in Zookeeper. To turn this on in properties

```
clusterStorage=zk:/kafka-mesos
clusterStorage=zk:master:2181/kafka-mesos
```

Failed Broker Recovery
Expand Down Expand Up @@ -521,6 +537,75 @@ Error: broker 1 timeout on start
Navigating the CLI
==================

Adding a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster add
Add cluster
Usage: cluster add <cluster-id> [options]

Option Description
------ -----------
--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2:
2181/kafka1

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000
```

Updating a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster update
Update cluster
Usage: cluster update <cluster-id> [options]

Option Description
------ -----------
--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2:
2181/kafka1

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000
```

Removing a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster remove
Remove cluster
Usage: cluster remove <cluster-id>

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000

```

Listing clusters
-------------------------------

```
# ./kafka-mesos.sh help cluster list
List brokers
Usage: cluster list

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000

```


Adding brokers to the cluster
-------------------------------

Expand Down Expand Up @@ -774,7 +859,7 @@ Listing Topics
```
#./kafka-mesos.sh help topic list
List topics
Usage: topic list [<topic-expr>]
Usage: topic list [<topic-expr>] --cluster <cluster-id>

Generic Options
Option Description
Expand All @@ -793,7 +878,7 @@ Adding Topic
```
#./kafka-mesos.sh help topic add
Add topic
Usage: topic add <topic-expr> [options]
Usage: topic add <topic-expr> [options] --cluster <cluster-id>

Option Description
------ -----------
Expand Down Expand Up @@ -830,7 +915,7 @@ Updating Topic
```
#./kafka-mesos.sh help topic update
Update topic
Usage: topic update <topic-expr> [options]
Usage: topic update <topic-expr> [options] --cluster <cluster-id>

Option Description
------ -----------
Expand All @@ -853,7 +938,7 @@ Rebalancing topics
```
#./kafka-mesos.sh help topic rebalance
Rebalance topics
Usage: topic rebalance <topic-expr>|status [options]
Usage: topic rebalance <topic-expr>|status [options] --cluster <cluster-id>

Option Description
------ -----------
Expand Down
4 changes: 1 addition & 3 deletions kafka-mesos.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ debug=true

user=vagrant

storage=zk:/mesos-kafka-scheduler
storage=zk:master:2181

master=master:5050

zk=master:2181/chroot

#for testing on the vagrant master via ./kafka-mesos.sh scheduler
#you will eventually want to run this on a scheduler i.e marathon
#change the IP to what is service discoverable & routable for your setup
Expand Down
25 changes: 18 additions & 7 deletions src/scala/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import org.apache.mesos.Protos.Resource.DiskInfo.Persistence
import org.apache.mesos.Protos.Volume.Mode

import scala.collection.JavaConversions._
import scala.collection
import org.apache.mesos.Protos.{Volume, Value, Resource, Offer}
import java.util._
import java.util.{TimeZone, Collections, UUID, Date}
import ly.stealth.mesos.kafka.Broker.{Metrics, Stickiness, Failover}
import ly.stealth.mesos.kafka.Util.BindAddress
import net.elodina.mesos.util.{Strings, Period, Range, Repr}
import java.text.SimpleDateFormat
import scala.List
import scala.collection.Map
import scala.util.parsing.json.JSONObject

class Broker(_id: String = "0") {
var id: String = _id
var cluster: Cluster = null

@volatile var active: Boolean = false

var cpus: Double = 1
Expand All @@ -58,6 +57,16 @@ class Broker(_id: String = "0") {
// broker has been modified while being in non stopped state, once stopped or before task launch becomes false
var needsRestart: Boolean = false

def this(id: String, cluster: Cluster){
this(id)
this.cluster = cluster
}

def this(json: Map[String, Any], expanded: Boolean = false) = {
this
fromJson(json, expanded)
}

def options(defaults: util.Map[String, String] = null): util.Map[String, String] = {
val result = new util.LinkedHashMap[String, String]()
if (defaults != null) result.putAll(defaults)
Expand Down Expand Up @@ -244,8 +253,9 @@ class Broker(_id: String = "0") {
matches
}

def fromJson(node: Map[String, Object]): Unit = {
def fromJson(node: Map[String, Any], expanded: Boolean = false): Unit = {
id = node("id").asInstanceOf[String]
cluster = if (expanded) new Cluster(node("cluster").asInstanceOf[Map[String, Any]]) else Nodes.getCluster(node("cluster").asInstanceOf[String])
active = node("active").asInstanceOf[Boolean]

cpus = node("cpus").asInstanceOf[Number].doubleValue()
Expand Down Expand Up @@ -277,9 +287,10 @@ class Broker(_id: String = "0") {
if (node.contains("needsRestart")) needsRestart = node("needsRestart").asInstanceOf[Boolean]
}

def toJson: JSONObject = {
def toJson(expanded: Boolean = false): JSONObject = {
val obj = new collection.mutable.LinkedHashMap[String, Any]()
obj("id") = id
obj("cluster") = if (expanded) cluster.toJson else cluster.id
obj("active") = active

obj("cpus") = cpus
Expand Down Expand Up @@ -316,7 +327,7 @@ object Broker {

def idFromExecutorId(executorId: String): String = idFromTaskId(executorId)

def isOptionOverridable(name: String): Boolean = !List("broker.id", "port", "zookeeper.connect").contains(name)
def isOptionOverridable(name: String): Boolean = !scala.List("broker.id", "port", "zookeeper.connect").contains(name)

class Stickiness(_period: Period = new Period("10m")) {
var period: Period = _period
Expand Down
Loading