[SPARK-6431][Streaming][Kafka] Error message for partition metadata requ...

...ests

The original reported problem was misdiagnosed; the topic just didn't exist yet.  Agreed upon solution was to improve error handling / message

Author: cody koeninger <cody@koeninger.org>

Closes #5454 from koeninger/spark-6431-master and squashes the following commits:

44300f8 [cody koeninger] [SPARK-6431][Streaming][Kafka] Error message for partition metadata requests
This commit is contained in:
cody koeninger 2015-04-12 17:37:30 +01:00 committed by Sean Owen
parent ddc17431a4
commit 6ac8eea2fc
2 changed files with 14 additions and 3 deletions

View file

@ -123,9 +123,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
// error codes here indicate missing / just created topic,
// repeating on a different broker wont be useful
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
}
}
}
Left(errs)
}

View file

@ -52,6 +52,9 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll {
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
val err = kc.getPartitions(Set(topic + "BAD"))
assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
}
test("leader offset apis") {