Performance Test
Kafka 2.13-4.0.0
Test Class
We are using the following code to test throughput latencies of the Kafka broker on localhost:9092:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (KafkaProducer producer = new KafkaProducer<>(props)) {
for (int i = 1; i <= 100; i++) {
String key = "key-" + i;
String value = "message-" + i;
ProducerRecord record =
new ProducerRecord<>("test-topic", key, value);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, offset=%d)%n",
record.key(), record.value(),
metadata.partition(), metadata.offset());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
The command line with problematic packages omitted is:
java -javaagent:TopJ.jar=1,20,'^(?!.*(org.apache.logging.slf4j.Log4jLoggerFactory|com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer|scala.collection.immutable.|scala.collection.convert.JavaCollectionWrapper|org.apache.kafka.common.record.DefaultRecordBatch|org.apache.kafka.common.feature.SupportedVersionRange)).*$',err -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=logs -Dlog4j2.configurationFile=config/log4j2.yaml -cp libs/*:/opt/kafka/libs/* kafka.Kafka config/server.properties
Results
Boot-time exploratory:
[TopJ] Top 20 Method Timing Stats for 4 Threads (sorted by avg time)
[TopJ] kafka.Kafka.main: max=1181.50 ms, total=1181.50 ms, avg=1181.50 ms, calls=1
[TopJ] scala.collection.mutable.TreeMap$.empty: max=549.10 ms, total=549.10 ms, avg=274.55 ms, calls=2
[TopJ] com.fasterxml.jackson.databind.ser.std.NumberSerializers.addAll: max=169.00 ms, total=169.00 ms, avg=169.00 ms, calls=1
[TopJ] com.fasterxml.jackson.databind.ObjectMapper._newReader: max=162.14 ms, total=162.14 ms, avg=162.14 ms, calls=1
[TopJ] com.fasterxml.jackson.databind.ser.std.StdJdkSerializers.all: max=138.71 ms, total=138.71 ms, avg=138.71 ms, calls=1
[TopJ] kafka.Kafka$.getPropsFromArgs: max=112.73 ms, total=112.73 ms, avg=112.73 ms, calls=1
[TopJ] scala.collection.mutable.TreeMap.keySet: max=110.71 ms, total=110.71 ms, avg=110.71 ms, calls=1
[TopJ] kafka.raft.KafkaRaftManager.buildRaftClient: max=108.99 ms, total=108.99 ms, avg=108.99 ms, calls=1
[TopJ] org.apache.kafka.metadata.util.BatchFileReader$Builder.build: max=79.35 ms, total=79.35 ms, avg=79.35 ms, calls=1
[TopJ] kafka.Kafka$.buildServer: max=77.61 ms, total=77.61 ms, avg=77.61 ms, calls=1
[TopJ] kafka.network.SocketServer.createDataPlaneAcceptor: max=76.55 ms, total=76.55 ms, avg=76.55 ms, calls=1
[TopJ] com.fasterxml.jackson.databind.ObjectMapper.defaultClassIntrospector: max=203.12 ms, total=203.13 ms, avg=67.71 ms, calls=3
[TopJ] com.fasterxml.jackson.dataformat.yaml.YAMLFactory._createParser: max=66.12 ms, total=66.12 ms, avg=66.12 ms, calls=1
[TopJ] scala.collection.ArrayOps$.contains$extension: max=123.52 ms, total=123.52 ms, avg=61.76 ms, calls=2
[TopJ] org.apache.logging.log4j.core.config.yaml.YamlConfigurationFactory.getConfiguration: max=60.97 ms, total=60.97 ms, avg=60.97 ms, calls=1
[TopJ] org.apache.kafka.common.requests.ApiVersionsResponse.collectApis: max=55.99 ms, total=55.99 ms, avg=55.99 ms, calls=1
[TopJ] org.pcollections.IntTree.iterator: max=46.68 ms, total=46.68 ms, avg=46.68 ms, calls=1
[TopJ] scala.collection.MapOps$$anon$1.iterator: max=45.70 ms, total=45.70 ms, avg=45.70 ms, calls=1
[TopJ] com.fasterxml.jackson.databind.node.ObjectNode.toString: max=109.52 ms, total=109.52 ms, avg=36.51 ms, calls=3
[TopJ] com.fasterxml.jackson.databind.introspect.BasicClassIntrospector.constructPropertyCollector: max=34.77 ms, total=34.77 ms, avg=34.77 ms, calls=1
Initial findings are issues with scala collections. scala.collection.mutable.TreeMap$.empty returns an iterable of an empty TreeMap yet takes over 250ms. Scala is not looking a great choice by Apache to implement with this. But these are just boot-time timings and are generally only warm-up one-offs, so let's test run-time and focus on scala.collection.
The prompt is now:
java -javaagent:TopJ.jar=5,20,'^(?=.*(?:scala\.collection\.))(?!scala\.collection\.LinearSeq$)(?!scala\.collection\.immutable\.)(?!scala\.collection\.convert\.JavaCollectionWrapper).*',err -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=logs -Dlog4j2.configurationFile=config/log4j2.yaml -cp libs/*:/opt/kafka/libs/* kafka.Kafka config/server.properties
And we get this after boot is complete:
[TopJ] Top 20 Method Timing Stats for 24 Threads (sorted by avg time)
[TopJ] scala.collection.mutable.TreeMap$.empty: max=928.10 ms, total=928.11 ms, avg=464.05 ms, calls=2
[TopJ] scala.collection.mutable.TreeMap.keySet: max=144.25 ms, total=144.25 ms, avg=144.25 ms, calls=1
[TopJ] scala.collection.Iterator.map: max=230.66 ms, total=230.67 ms, avg=76.89 ms, calls=3
[TopJ] scala.collection.MapOps$KeySet.iterator: max=77.78 ms, total=152.61 ms, avg=30.52 ms, calls=5
[TopJ] scala.collection.IterableOnceOps.toBuffer: max=29.98 ms, total=29.98 ms, avg=29.98 ms, calls=1
[TopJ] scala.collection.Iterator.flatMap: max=137.43 ms, total=137.46 ms, avg=27.49 ms, calls=5
[TopJ] scala.collection.Iterator.filterImpl: max=184.40 ms, total=344.84 ms, avg=24.63 ms, calls=14
[TopJ] scala.collection.ArrayOps$.contains$extension: max=78.05 ms, total=78.08 ms, avg=19.52 ms, calls=4
[TopJ] scala.collection.MapOps$$anon$1.iterator: max=106.18 ms, total=168.59 ms, avg=16.86 ms, calls=10
[TopJ] scala.collection.MapOps$KeySet.filterNot: max=114.85 ms, total=114.91 ms, avg=16.42 ms, calls=7
[TopJ] scala.collection.SeqFactory$Delegate.newBuilder: max=15.13 ms, total=15.13 ms, avg=15.13 ms, calls=1
[TopJ] scala.collection.IterableOnceOps.toSet: max=720.98 ms, total=721.96 ms, avg=13.13 ms, calls=55
[TopJ] scala.collection.mutable.HashSet.concat: max=71.51 ms, total=71.56 ms, avg=7.95 ms, calls=9
[TopJ] scala.collection.mutable.ListBuffer$.empty: max=190.11 ms, total=190.34 ms, avg=7.05 ms, calls=27
[TopJ] scala.collection.mutable.HashMap$.from: max=231.03 ms, total=231.58 ms, avg=6.81 ms, calls=34
[TopJ] scala.collection.convert.AsJavaExtensions.BufferHasAsJava: max=13.13 ms, total=13.13 ms, avg=6.56 ms, calls=2
[TopJ] scala.collection.mutable.TreeMap.get: max=50.70 ms, total=50.73 ms, avg=6.34 ms, calls=8
[TopJ] scala.collection.Iterator.concat: max=65.80 ms, total=65.85 ms, avg=5.49 ms, calls=12
[TopJ] scala.collection.MapOps$KeySet.iterableFactory: max=37.19 ms, total=37.22 ms, avg=5.32 ms, calls=7
With this produced after SimpleKafkaProducer has been run:
[TopJ] Top 20 Method Timing Stats for 41 Threads (sorted by avg time)
[TopJ] scala.collection.mutable.TreeMap$.empty: max=928.10 ms, total=928.11 ms, avg=464.05 ms, calls=2
[TopJ] scala.collection.Iterator.collect: max=165.43 ms, total=165.43 ms, avg=165.43 ms, calls=1
[TopJ] scala.collection.mutable.TreeMap.keySet: max=144.25 ms, total=144.25 ms, avg=144.25 ms, calls=1
[TopJ] scala.collection.ArrayOps$.contains$extension: max=78.05 ms, total=78.08 ms, avg=19.52 ms, calls=4
[TopJ] scala.collection.MapOps$KeySet.filterNot: max=114.85 ms, total=114.91 ms, avg=16.42 ms, calls=7
[TopJ] scala.collection.SeqFactory$Delegate.newBuilder: max=15.13 ms, total=15.13 ms, avg=15.13 ms, calls=1
[TopJ] scala.collection.IterableOnceOps.toBuffer: max=29.98 ms, total=29.98 ms, avg=14.99 ms, calls=2
[TopJ] scala.collection.Iterator.filterImpl: max=184.40 ms, total=344.92 ms, avg=12.77 ms, calls=27
[TopJ] scala.collection.Iterator.flatMap: max=137.43 ms, total=137.50 ms, avg=12.50 ms, calls=11
[TopJ] scala.collection.mutable.HashSet.concat: max=71.51 ms, total=71.56 ms, avg=7.95 ms, calls=9
[TopJ] scala.collection.mutable.TreeMap.get: max=50.70 ms, total=50.73 ms, avg=6.34 ms, calls=8
[TopJ] scala.collection.IterableOnceOps.toSet: max=720.98 ms, total=722.68 ms, avg=4.57 ms, calls=158
[TopJ] scala.collection.convert.AsJavaExtensions.BufferHasAsJava: max=13.13 ms, total=13.13 ms, avg=4.38 ms, calls=3
[TopJ] scala.collection.convert.AsScalaExtensions$ListHasAsScala.asScala: max=443.84 ms, total=444.35 ms, avg=3.73 ms, calls=119
[TopJ] scala.collection.mutable.RedBlackTree$Tree$.empty: max=5.81 ms, total=5.81 ms, avg=2.91 ms, calls=2
[TopJ] scala.collection.convert.AsScalaExtensions.ConcurrentMapHasAsScala: max=8.29 ms, total=8.30 ms, avg=2.77 ms, calls=3
[TopJ] scala.collection.ArrayOps$.map$extension: max=5.69 ms, total=10.85 ms, avg=2.71 ms, calls=4
[TopJ] scala.collection.StringOps$.r$extension: max=2.65 ms, total=2.65 ms, avg=2.65 ms, calls=1
[TopJ] scala.collection.convert.AsJavaExtensions.IterableHasAsJava: max=7.85 ms, total=7.86 ms, avg=1.97 ms, calls=4
Some of this Scala code is surprisingly slow for what it's actually doing. We're not talking significant load here, just 100 messages.
Solution
Change Scala code to Java. It's cludging up the JVM for the sake of slightly shorter code. Scala probably has it's use in some applications, but this is definitely not one of them from a
performance perspective.