TopJ

Performance Test

Kafka 2.13-4.0.0

Test Class

We are using the following code to test throughput latencies of the Kafka broker on localhost:9092:

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class SimpleKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); try (KafkaProducer producer = new KafkaProducer<>(props)) { for (int i = 1; i <= 100; i++) { String key = "key-" + i; String value = "message-" + i; ProducerRecord record = new ProducerRecord<>("test-topic", key, value); RecordMetadata metadata = producer.send(record).get(); System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, offset=%d)%n", record.key(), record.value(), metadata.partition(), metadata.offset()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }

The command line with problematic packages omitted is:

java -javaagent:TopJ.jar=1,20,'^(?!.*(org.apache.logging.slf4j.Log4jLoggerFactory|com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer|scala.collection.immutable.|scala.collection.convert.JavaCollectionWrapper|org.apache.kafka.common.record.DefaultRecordBatch|org.apache.kafka.common.feature.SupportedVersionRange)).*$',err -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=logs -Dlog4j2.configurationFile=config/log4j2.yaml -cp libs/*:/opt/kafka/libs/* kafka.Kafka config/server.properties

Results

Boot-time exploratory:

[TopJ] Top 20 Method Timing Stats for 4 Threads (sorted by avg time) [TopJ] kafka.Kafka.main: max=1181.50 ms, total=1181.50 ms, avg=1181.50 ms, calls=1 [TopJ] scala.collection.mutable.TreeMap$.empty: max=549.10 ms, total=549.10 ms, avg=274.55 ms, calls=2 [TopJ] com.fasterxml.jackson.databind.ser.std.NumberSerializers.addAll: max=169.00 ms, total=169.00 ms, avg=169.00 ms, calls=1 [TopJ] com.fasterxml.jackson.databind.ObjectMapper._newReader: max=162.14 ms, total=162.14 ms, avg=162.14 ms, calls=1 [TopJ] com.fasterxml.jackson.databind.ser.std.StdJdkSerializers.all: max=138.71 ms, total=138.71 ms, avg=138.71 ms, calls=1 [TopJ] kafka.Kafka$.getPropsFromArgs: max=112.73 ms, total=112.73 ms, avg=112.73 ms, calls=1 [TopJ] scala.collection.mutable.TreeMap.keySet: max=110.71 ms, total=110.71 ms, avg=110.71 ms, calls=1 [TopJ] kafka.raft.KafkaRaftManager.buildRaftClient: max=108.99 ms, total=108.99 ms, avg=108.99 ms, calls=1 [TopJ] org.apache.kafka.metadata.util.BatchFileReader$Builder.build: max=79.35 ms, total=79.35 ms, avg=79.35 ms, calls=1 [TopJ] kafka.Kafka$.buildServer: max=77.61 ms, total=77.61 ms, avg=77.61 ms, calls=1 [TopJ] kafka.network.SocketServer.createDataPlaneAcceptor: max=76.55 ms, total=76.55 ms, avg=76.55 ms, calls=1 [TopJ] com.fasterxml.jackson.databind.ObjectMapper.defaultClassIntrospector: max=203.12 ms, total=203.13 ms, avg=67.71 ms, calls=3 [TopJ] com.fasterxml.jackson.dataformat.yaml.YAMLFactory._createParser: max=66.12 ms, total=66.12 ms, avg=66.12 ms, calls=1 [TopJ] scala.collection.ArrayOps$.contains$extension: max=123.52 ms, total=123.52 ms, avg=61.76 ms, calls=2 [TopJ] org.apache.logging.log4j.core.config.yaml.YamlConfigurationFactory.getConfiguration: max=60.97 ms, total=60.97 ms, avg=60.97 ms, calls=1 [TopJ] org.apache.kafka.common.requests.ApiVersionsResponse.collectApis: max=55.99 ms, total=55.99 ms, avg=55.99 ms, calls=1 [TopJ] org.pcollections.IntTree.iterator: max=46.68 ms, total=46.68 ms, avg=46.68 ms, calls=1 [TopJ] scala.collection.MapOps$$anon$1.iterator: max=45.70 ms, total=45.70 ms, avg=45.70 ms, calls=1 [TopJ] com.fasterxml.jackson.databind.node.ObjectNode.toString: max=109.52 ms, total=109.52 ms, avg=36.51 ms, calls=3 [TopJ] com.fasterxml.jackson.databind.introspect.BasicClassIntrospector.constructPropertyCollector: max=34.77 ms, total=34.77 ms, avg=34.77 ms, calls=1

Initial findings are issues with scala collections. scala.collection.mutable.TreeMap$.empty returns an iterable of an empty TreeMap yet takes over 250ms. Scala is not looking a great choice by Apache to implement with this. But these are just boot-time timings and are generally only warm-up one-offs, so let's test run-time and focus on scala.collection.

The prompt is now:

java -javaagent:TopJ.jar=5,20,'^(?=.*(?:scala\.collection\.))(?!scala\.collection\.LinearSeq$)(?!scala\.collection\.immutable\.)(?!scala\.collection\.convert\.JavaCollectionWrapper).*',err -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=logs -Dlog4j2.configurationFile=config/log4j2.yaml -cp libs/*:/opt/kafka/libs/* kafka.Kafka config/server.properties

And we get this after boot is complete:

[TopJ] Top 20 Method Timing Stats for 24 Threads (sorted by avg time) [TopJ] scala.collection.mutable.TreeMap$.empty: max=928.10 ms, total=928.11 ms, avg=464.05 ms, calls=2 [TopJ] scala.collection.mutable.TreeMap.keySet: max=144.25 ms, total=144.25 ms, avg=144.25 ms, calls=1 [TopJ] scala.collection.Iterator.map: max=230.66 ms, total=230.67 ms, avg=76.89 ms, calls=3 [TopJ] scala.collection.MapOps$KeySet.iterator: max=77.78 ms, total=152.61 ms, avg=30.52 ms, calls=5 [TopJ] scala.collection.IterableOnceOps.toBuffer: max=29.98 ms, total=29.98 ms, avg=29.98 ms, calls=1 [TopJ] scala.collection.Iterator.flatMap: max=137.43 ms, total=137.46 ms, avg=27.49 ms, calls=5 [TopJ] scala.collection.Iterator.filterImpl: max=184.40 ms, total=344.84 ms, avg=24.63 ms, calls=14 [TopJ] scala.collection.ArrayOps$.contains$extension: max=78.05 ms, total=78.08 ms, avg=19.52 ms, calls=4 [TopJ] scala.collection.MapOps$$anon$1.iterator: max=106.18 ms, total=168.59 ms, avg=16.86 ms, calls=10 [TopJ] scala.collection.MapOps$KeySet.filterNot: max=114.85 ms, total=114.91 ms, avg=16.42 ms, calls=7 [TopJ] scala.collection.SeqFactory$Delegate.newBuilder: max=15.13 ms, total=15.13 ms, avg=15.13 ms, calls=1 [TopJ] scala.collection.IterableOnceOps.toSet: max=720.98 ms, total=721.96 ms, avg=13.13 ms, calls=55 [TopJ] scala.collection.mutable.HashSet.concat: max=71.51 ms, total=71.56 ms, avg=7.95 ms, calls=9 [TopJ] scala.collection.mutable.ListBuffer$.empty: max=190.11 ms, total=190.34 ms, avg=7.05 ms, calls=27 [TopJ] scala.collection.mutable.HashMap$.from: max=231.03 ms, total=231.58 ms, avg=6.81 ms, calls=34 [TopJ] scala.collection.convert.AsJavaExtensions.BufferHasAsJava: max=13.13 ms, total=13.13 ms, avg=6.56 ms, calls=2 [TopJ] scala.collection.mutable.TreeMap.get: max=50.70 ms, total=50.73 ms, avg=6.34 ms, calls=8 [TopJ] scala.collection.Iterator.concat: max=65.80 ms, total=65.85 ms, avg=5.49 ms, calls=12 [TopJ] scala.collection.MapOps$KeySet.iterableFactory: max=37.19 ms, total=37.22 ms, avg=5.32 ms, calls=7

With this produced after SimpleKafkaProducer has been run:

[TopJ] Top 20 Method Timing Stats for 41 Threads (sorted by avg time) [TopJ] scala.collection.mutable.TreeMap$.empty: max=928.10 ms, total=928.11 ms, avg=464.05 ms, calls=2 [TopJ] scala.collection.Iterator.collect: max=165.43 ms, total=165.43 ms, avg=165.43 ms, calls=1 [TopJ] scala.collection.mutable.TreeMap.keySet: max=144.25 ms, total=144.25 ms, avg=144.25 ms, calls=1 [TopJ] scala.collection.ArrayOps$.contains$extension: max=78.05 ms, total=78.08 ms, avg=19.52 ms, calls=4 [TopJ] scala.collection.MapOps$KeySet.filterNot: max=114.85 ms, total=114.91 ms, avg=16.42 ms, calls=7 [TopJ] scala.collection.SeqFactory$Delegate.newBuilder: max=15.13 ms, total=15.13 ms, avg=15.13 ms, calls=1 [TopJ] scala.collection.IterableOnceOps.toBuffer: max=29.98 ms, total=29.98 ms, avg=14.99 ms, calls=2 [TopJ] scala.collection.Iterator.filterImpl: max=184.40 ms, total=344.92 ms, avg=12.77 ms, calls=27 [TopJ] scala.collection.Iterator.flatMap: max=137.43 ms, total=137.50 ms, avg=12.50 ms, calls=11 [TopJ] scala.collection.mutable.HashSet.concat: max=71.51 ms, total=71.56 ms, avg=7.95 ms, calls=9 [TopJ] scala.collection.mutable.TreeMap.get: max=50.70 ms, total=50.73 ms, avg=6.34 ms, calls=8 [TopJ] scala.collection.IterableOnceOps.toSet: max=720.98 ms, total=722.68 ms, avg=4.57 ms, calls=158 [TopJ] scala.collection.convert.AsJavaExtensions.BufferHasAsJava: max=13.13 ms, total=13.13 ms, avg=4.38 ms, calls=3 [TopJ] scala.collection.convert.AsScalaExtensions$ListHasAsScala.asScala: max=443.84 ms, total=444.35 ms, avg=3.73 ms, calls=119 [TopJ] scala.collection.mutable.RedBlackTree$Tree$.empty: max=5.81 ms, total=5.81 ms, avg=2.91 ms, calls=2 [TopJ] scala.collection.convert.AsScalaExtensions.ConcurrentMapHasAsScala: max=8.29 ms, total=8.30 ms, avg=2.77 ms, calls=3 [TopJ] scala.collection.ArrayOps$.map$extension: max=5.69 ms, total=10.85 ms, avg=2.71 ms, calls=4 [TopJ] scala.collection.StringOps$.r$extension: max=2.65 ms, total=2.65 ms, avg=2.65 ms, calls=1 [TopJ] scala.collection.convert.AsJavaExtensions.IterableHasAsJava: max=7.85 ms, total=7.86 ms, avg=1.97 ms, calls=4

Some of this Scala code is surprisingly slow for what it's actually doing. We're not talking significant load here, just 100 messages.

Solution

Change Scala code to Java. It's cludging up the JVM for the sake of slightly shorter code. Scala probably has it's use in some applications, but this is definitely not one of them from a performance perspective.
DeployJ.com