Getting Started with the Apache Pulsar Java Client

DataStax
Building Real-World, Real-Time AI
6 min readSep 23, 2022

--

Author: Mary Grygleski

Get hands-on with Apache Pulsar’s Java client and create a message producer and consumer.

Apache Pulsar is an open-source, distributed platform for streaming and messaging created by Yahoo. Pulsar is created on a pub-sub model, with brokers and bookies moving data between producers and consumers.

Communication between producers and consumers is made possible by the Pulsar client. Pulsar supports the building of producers and consumers in various programming languages by providing different client libraries. For Java applications, we use the Java client.

In this article, we’ll use the Apache Pulsar Java client to create a producer that writes to a specific topic and a consumer that reads from that topic.

How to Use the Pulsar Java Client

This demo uses Pulsar standalone in Docker and Pulsar Java client.

Prerequisites

Ensure you have Docker Desktop installed. If you don’t have it, download and follow the installation instructions.

To start a Pulsar standalone container, open a terminal and run this command:

$ docker run -it -p 6650:6650 -p 8080:8080 — mount source=pulsardata,target=/pulsar/data — mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone

Note:

  • Configure Docker Desktop to use Linux Containers if you’re using Windows.
  • Add the --platform=linux/amd64 flag to specify the architecture and the OS if you’re using macOS.

To confirm that the container has been created, open a new terminal and run the following command:

$ docker ps

If the container has been created, you move on to the next step — creating a Maven project.

Creating a Producer

To create a Java client producer, you first need to set up a Maven project and add the Pulsar dependency:

<! — in your <properties> block →
<pulsar.version>2.10.1</pulsar.version>
<! — in your <dependencies> block →
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>

To get started, create a producer class named PulsarProducer and a constructor to handle the variables:

public class PulsarProducer{
public static void main( String[] args ){
try {
}
catch(PulsarClientException e) {
e.printStackTrace();
}

}
}

Before you can create a producer, first instantiate a PulsarClient object:

PulsarClient client = PulsarClient.builder()
.serviceUrl(“pulsar://localhost:6650”)
.build();

Then, specify the broker you intend to use as a URL. From the code above, the broker address is found with serviceUrl as pulsar://localhost:6650. This is the port you exposed when you created the standalone container.

Next, create a producer for a specific Pulsar topic:

//Create a new producer and specify the topic they can send messages to
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("starter-topic")
.create();
//Send the message to the broker
stringProducer.send("My message");
stringProducer.close();

By default, producer messages consist of bytes of arrays, which can be changed by specifying the message schema. In this case, you have specified it as Schema.STRING.

The topic the producer can write to is passed to a topic() method.

After creating and sending a message, it’s considered best practice to close the producer.

Then, after making the necessary imports, the full producer code should look like this:

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
public class PulsarProducer
{
public static void main( String[] args )
{
try {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("starter-topic")
.create();
stringProducer.send("Hello Pulsar. Enjoy Your Day! - From Producer");
stringProducer.close();
}
catch (PulsarClientException e) {
e.printStackTrace();
}

}
}

Now you can test whether your producer is working by running a consumer inside the Pulsar standalone container. Open a terminal and access the container terminal by running this command:

$ docker exec -it <container id> /bin/bash

Move into the bin directory:

$ cd bin

Execute this command to consume starter-topic:

$ ./pulsar-client consume starter-topic -s “my-subscription”

Your output on the terminal should look like this:

This shows that the client has been created and is ready to start handling messages. You’ll also see the client connected to the server, and the details related to the starter-topic under topicNames are included.

Now let’s create a consumer to see how the producer you’ve just created works.

Creating a Consumer

We’ll now create a consumer using the Pulsar Java client. The consumer we created in this section will be subscribed to the topic “starter-topic” that the producer above writes to.

Let’s start by creating a consumer class and a constructor to handle the broker URL and the necessary variables:

public class PulsarConsumer {
public static void main(String[] args) {
try {
}
catch(PulsarClientException e) {
e.printStackTrace();
}
}
}

The next step is to instantiate the PulsarClient object.

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();

Now, create a new consumer that reads from the starter-topic and is subscribed to my-subscription.

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("starter-topic")
.subscriptionName("my-subscription")
.subscribe();

The subscribe() method above subscribes the consumer to the specified topic and subscription. To ensure that the consumer continuously listens to this topic, add a while loop.

while (true) {
// Listening and waiting for a message
Message<String> msg = consumer.receive();
try {
// Print out that the message has been received
System.out.println("Message received: " + new String(msg.getData()));
// Send an acknowledgment back so that it can be deleted by the broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Notify that the message delivery failed

}
}

This while loop also allows the consumer to take several actions, depending on whether the message is delivered. Upon successful delivery, the message is printed out, and an acknowledgment is sent to allow the broker to delete the message.

Here’s the full code of the consumer:

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
public class PulsarConsumer {
public static void main(String[] args) {

try {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("starter-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
// Listening and waiting for a message
Message<String> msg = consumer.receive();

try {
// Print out that the message has been received

System.out.println("Message received: " + new String(msg.getData()));

// Send an acknowledgment back so that it can be deleted by the broker

consumer.acknowledge(msg);

} catch (Exception e) {

// Notify that the message delivery failed

//consumer.negativeAcknowledge(msg);
}
}
}
catch(PulsarClientException e) {
e.printStackTrace();
}
}
}

That’s it! You now have a producer and a consumer.

Let’s run a few tests to ensure the consumer can read from the topics the producer is writing on. This is the default message after running the Maven project you’ve just created:

My message” is what we included as a message while creating a producer. This was our first message. If all is well with the consumer we created, the message should be printed out as Message received: My message.

Let’s change the message sent by the producer by editing the producer class. Change the message sent to the starter-topic:

Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("starter-topic")
.create();
stringProducer.send("Hello Pulsar. Enjoy Your Day! From Producer");stringProducer.close();

You should get this message in the console:

You’ve now confirmed that the Pulsar Java client enables the communication between our producer and consumer through the brokers.

Conclusion

Pulsar supports the creation of producers and consumers in different programming languages using client libraries. We use the Pulsar Java client to create producers and consumers for Java applications.

Consumers and producers must follow a few cardinal rules to publish and consume messages successfully:

  • A client specifying the broker to be used to be instantiated for a producer or consumer to work.
  • A producer must specify the topic and the broker URL they’re writing to before sending a message.
  • A consumer has to specify the subscription, the topic, and the broker to successfully consume messages.

The Pulsar Java client can be used to create applications that rely on real-time data. For instance, it can be used to handle fraud alerts within financial systems or an eCommerce application that creates recommendations based on user activities.

Follow the DataStax Tech Blog for more developer stories. Check out our YouTube channel for tutorials and follow DataStax Developers on Twitter for the latest news about our developer community.

Resources

  1. Apache Pulsar client libraries
  2. Apache Pulsar Java client
  3. Pulsar standalone
  4. Apache Maven

--

--

DataStax
Building Real-World, Real-Time AI

DataStax provides the real-time vector data tools that generative AI apps need, with seamless integration with developers' stacks of choice.