Skip navigation links

Azure SDK for Java Reference Documentation

Current version is 5.0.0-beta.6, click here for the index

See: Description

Azure Event Hubs 
Package Description
com.azure.messaging.eventhubs
Package containing classes for creating EventHubProducerAsyncClient, EventHubProducerClient, EventHubConsumerAsyncClient, EventHubConsumerClient, or EventProcessorClient to perform operations on Azure Event Hubs.
com.azure.messaging.eventhubs.models
Package containing classes used for creating and configuring events that are being sent-to and received-from Azure Event Hubs service.
Current version is 5.0.0-beta.6, click here for the index

Azure Event Hubs client library for Java

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform, and store it by using any real-time analytics provider or with batching/storage adapters. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs?

The Azure Event Hubs client library allows for publishing and consuming of Azure Event Hubs events and may be used to:

Source code | API reference documentation | Product documentation | Samples

Table of contents

Getting started

Prerequisites

Adding the package to your product

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.0.0-preview.5</version>
</dependency>

Default SSL library

All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL operations. The Boring SSL library is an uber jar containing native libraries for Linux/macOS/Windows, and provides better performance compared to the default SSL implementation within the JDK. For more information, including how to reduce the dependency size, refer to the performance tuning section of the wiki.

Methods to authorize with Event Hubs

For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it.

Create an Event Hub producer using a connection string

The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to get an Event Hubs connection string.

Both the asynchronous and synchronous Event Hub producer and consumer clients can be created using EventHubClientBuilder. Invoking buildAsyncProducerClient or buildProducerClient() will build the asynchronous or synchronous producers. Similarly, buildAsyncConsumerClient() or buildConsumerClient() will build the appropriate consumers.

The snippet below creates a synchronous Event Hub producer.

String connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
String eventHubName = "<< NAME OF THE EVENT HUB >>";
EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString(connectionString, eventHubName)
    .buildProducerClient();

Create an Event Hub client using Microsoft identity platform (formerly Azure Active Directory)

Azure SDK for Java supports an Azure Identity package, making it simple get credentials from Microsoft identity platform. First, add the package:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.0.0</version>
</dependency>

All the implemented ways to request a credential can be found under the com.azure.identity.credential package. The sample below shows how to use an Azure Active Directory (AAD) application client secret to authorize with Azure Event Hubs.

Authorizing with AAD application client secret

Follow the instructions in Creating a service principal using Azure Portal to create a service principal and a client secret. The corresponding clientId and tenantId for the service principal can be obtained from the App registration page.

ClientSecretCredential credential = new ClientSecretCredentialBuilder
    .clientId("<< APPLICATION (CLIENT) ID >>")
    .clientSecret("<< APPLICATION SECRET >>")
    .tenantId("<< DIRECTORY (TENANT) ID >>")
    .build();

// The fully qualified namespace for the Event Hubs instance. This is likely to be similar to:
// {your-namespace}.servicebus.windows.net
String fullyQualifiedNamespace = "my-test-eventhubs.servicebus.windows.net";
String eventHubName = "<< NAME OF THE EVENT HUB >>";
EventHubProducerClient client = new EventHubClientBuilder()
    .credential(fullyQualifiedNamespace, eventHubName, credential)
    .buildProducerClient();

Key concepts

For more concepts and deeper discussion, see: Event Hubs Features. Also, the concepts for AMQP are well documented in OASIS Advanced Messaging Queuing Protocol AMQP Version 1.0.

Examples

Publish events to an Event Hub

To publish events, you'll need to create an asynchronous EventHubProducerAsyncClient or a synchronous EventHubProducerClient. Each producer can send events to either, a specific partition, or allow the Event Hubs service to decide which partition events should be published to. It is recommended to use automatic routing when the publishing of events needs to be highly available or when event data should be distributed evenly among the partitions.

Create an Event Hub producer and publish events

Developers can create a producer by calling buildAsyncProducerClient or buildProducerClient(). If buildProducerClient() is invoked, a synchronous EventHubProducerClient is created. If buildAsyncProducerClient() is used, an asynchronous EventHubProducerAsyncClient is returned.

Specifying CreateBatchOptions.setPartitionId(String) will send events to a specific partition. If not specified, will allow for automatic partition routing. In addition, specifying CreateBatchOptions.setPartitionKey(String) will tell Event Hubs service to hash the events and send them to the same partition.

The snippet below creates a synchronous producer and sends events to any partition, allowing Event Hubs service to route the event to an available partition.

EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

To send events to a particular partition, set the optional parameter setPartitionId(String) on CreateBatchOptions.

Publish events using partition identifier

Many Event Hub operations take place within the scope of a specific partition. Because partitions are owned by the Event Hub, their names are assigned at the time of creation. To understand what partitions are available, you can use the getPartitionIds function to get the ids of all available partitions in your Event Hub instance. All clients created using EventHubsClientBuilder can query for metadata about the Event Hub using getPartitionIds() or getEventHubProperties().

EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .buildProducerClient();

CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

Publish events using partition key

When an Event Hub producer is not associated with any specific partition, it may be desirable to request that the Event Hubs service keep different events or batches of events together on the same partition. This can be accomplished by setting a partition key when publishing the events.

CreateBatchOptions batchOptions = new CreateBatchOptions.setPartitionKey("grouping-key");
EventDataBatch eventDataBatch = producer.createBatch(batchOptions);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(eventDataBatch);

Consume events from an Event Hub partition

In order to consume events, you'll need to create an EventHubConsumerAsyncClient or EventHubConsumerClient for a specific consumer group. When an Event Hub is created, it starts with a default consumer group that can be used to get started. A consumer also needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we are creating an asynchronous consumer that receives events from partitionId and only listens to newest events that get pushed to the partition by invoking receiveFromPartitionString, EventPosition. Developers can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling receiveFromPartition(String, EventPosition) with another partition id, and subscribing to that Flux.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Receive events from partition with id "0", only getting events that are newly added to the partition.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
    // Process each event as it arrives.
});

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an EventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

String partitionId = "<< EVENT HUB PARTITION ID >>";

// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
    EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
    System.out.println("Event: " + event.getData().getBodyAsString());
}

Consume events using an EventProcessorClient

To consume events for all partitions of an Event Hub, you'll create an EventProcessorClient for a specific consumer group. When an Event Hub is created, it provides a default consumer group that can be used to get started.

The EventProcessorClient will delegate processing of events to a callback function that you provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility for managing the underlying consumer operations.

In our example, we will focus on building the EventProcessorClient, use the InMemoryCheckpointStore available in samples, and a callback function that processes events received from the Event Hub and writes to console.

class Program {
    public static void mainString[] args {
        EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
            .consumerGroup("<< CONSUMER GROUP NAME >>")
            .connectionString("<< EVENT HUB CONNECTION STRING >>")
            .checkpointStore(new InMemoryCheckpointStore())
            .processEvent(eventContext -> {
                System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
                    + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
            })
            .processError(errorContext -> {
                System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
            })
            .buildEventProcessorClient();

        // This will start the processor. It will start processing events from all partitions.
        eventProcessorClient.start();

        // (for demo purposes only - adding sleep to wait for receiving events)
        TimeUnit.SECONDS.sleep(2);

        // When the user wishes to stop processing events, they can call `stop()`.
        eventProcessorClient.stop();
    }
}

Troubleshooting

Enable client logging

You can set the AZURE_LOG_LEVEL environment variable to view logging statements made in the client library. For example, setting AZURE_LOG_LEVEL=2 would show all informational, warning, and error log messages. The log levels can be found here: log levels.

Enable AMQP transport logging

If enabling client logging is not enough to diagnose your issues. You can enable logging to a file in the underlying AMQP library, Qpid Proton-J. Qpid Proton-J uses java.util.logging. You can enable logging by create a configuration file with the contents below. Or set proton.trace.level=ALL and whichever configuration options you want for the java.util.logging.Handler implementation. Implementation classes and their options can be found in Java 8 SDK javadoc.

Sample "logging.config" file

The configuration file below logs trace output from proton-j to the file "proton-trace.log".

handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n

Common exceptions

AMQP exception

This is a general exception for AMQP related failures, which includes the AMQP errors as ErrorCondition and the context that caused this exception as ErrorContext. 'isTransient' is a boolean indicating if the exception is a transient error or not. If true, then the request can be retried; otherwise not.

AmqpErrorCondition contains error conditions common to the AMQP protocol and used by Azure services. When an AMQP exception is thrown, examining the error condition field can inform developers as to why the AMQP exception occurred and if possible, how to mitigate this exception. A list of all the AMQP exceptions can be found in OASIS AMQP Version 1.0 Transport Errors.

The AmqpErrorContext in the AmqpException provides information about the AMQP session, link, or connection that the exception occurred in. This is useful to diagnose which level in the transport this exception occurred at and whether it was an issue in one of the producers or consumers.

The recommended way to solve the specific exception the AMQP exception represents is to follow the Event Hubs Messaging Exceptions guidance.

Operation cancelled exception

It occurs when the underlying AMQP layer encounters an abnormal link abort or the connection is disconnected in an unexpected fashion. It is recommended to attempt to verify the current state and retry if necessary.

Message size exceeded

Event data, both individual and in batches, have a maximum size allowed. This includes the data of the event, as well as any associated metadata and system overhead. The best approach for resolving this error is to reduce the number of events being sent in a batch or the size of data included in the message. Because size limits are subject to change, please refer to Azure Event Hubs quotas and limits for specifics.

Other exceptions

For detailed information about these and other exceptions that may occur, please refer to Event Hubs Messaging Exceptions.

Next steps

Beyond those discussed, the Azure Event Hubs client library offers support for many additional scenarios to help take advantage of the full feature set of the Azure Event Hubs service. In order to help explore some of the these scenarios, the following set of sample is available:

Contributing

If you would like to become an active contributor to this project please refer to our Contribution Guidelines for more information.

Impressions

Skip navigation links
Visit the Azure for Java Developerssite for more Java documentation, including quick starts, tutorials, and code samples.

Copyright © 2019 Microsoft Corporation. All rights reserved.