NexJ Logo

Kafka server integration

NexJ applications support sending and receiving messages through Apache Kafka.

Kafka is a distributed streaming platform that enables applications to publish, subscribe to, and process streams of records. It offers the combined advantages of the queuing and publish-subscribe messaging systems and offers superior throughput, partitioning, replication, and fault tolerance. For more information about Kafka, see the Kafka documentation at https://kafka.apache.org/documentation/.

You can create Kafka channels and Kafka channel connections in the Integration and Deployment layers of NexJ Studio, respectively. You can configure Kafka channels to send messages to a Kafka server, receive messages from a Kafka server, or both. Sender channels determine the Kafka topic to send the messages to based on the channel configuration settings. Likewise, receiver channels determine how to process incoming messages based on the service bindings for the channel connections.

To set up communication between your NexJ application and the Kafka server:

  1. Create a Kafka channel.
  2. Configure the Kafka channel connection in the environment file. For information about how to configure a Kafka channel connection and for an example of a typical configuration, see Configuring a Kafka connection and Kafka channel connection settings in the NexJ CRM deployment information.
  3. Redeploy the application.

Creating a Kafka channel

To use Kafka to send and receive messages from your NexJ CRM application, you must add one or more Kafka channels. You can configure each Kafka channel to send, receive, or both send and receive messages.

To create a Kafka channel:

  1. In the Integration layer, click the Channels tab.
  2. Right-click inside the Channels list and select New Channel.
    The New Channel dialog opens.
  3. In the Name field, enter a meaningful name for your channel, for example, kafkaChannel.
  4. In the Channel Type list, select Kafka.
  5. Click Finish.
    The channel editor opens.

Next, configure the channel by defining the channel attributes on the Overview and Service Binding tabs. For reference, a set of example settings follows. Field-specific information is available in NexJ Studio.

Kafka channel settings

The following table provides an example Kafka channel configuration for reference.

Kafka channel settings for a typical NexJ CRM deployment

AttributeValueNotes
acknowledgementlazy

Message acknowledgement mode. The options are:

  • auto
    Waits for server acknowledgement immediately after sending a message. Failed messages result in an exception.
  • lazy
    Best effort sending which defers the server acknowledgement until the transaction is committed. Failed messages may not result in an exception.

Applies to sender channels only.

encodingUTF-8

Character set for serializing and deserializing string messages.

errorTopickafkaErrorTopic

The topic to which messages that encounter an error during processing are sent.

Applies to receiver channels only.

For messages to be sent to the error topic, the channel must be configured to send messages. Specifying an error topic without enabling send results in a validation exception.

If unspecified, the topic is not sent to an error topic, but a message displays in the log with the error details.

groupId

NexJ:Finance:kafkaChannel

The consumer group ID.

Defaults to <namespace>:<model name>:<channel name> if unspecified.

Applies to receiver channels only.

latestResetfalse

Indicates what the receiver should do if no offset is specified in Kafka or if the specified offset is no longer valid:

  • true
    automatically resets the offset to the latest offset in the Kafka topic being read from.
  • false
    automatically resets the offset to the earliest offset in the Kafka topic being read from.

Applies to receiver channels only.

maxPollInterval3600000

Maximum interval between poll requests (in milliseconds) before the Kafka cluster considers a consumer as failed.

Applies to receiver channels only.

maxPollRecords5

Maximum number of records returned by a poll request.

Applies to receiver channels only.

maxReceivers4

Maximum number of Kafka receiver threads per server.

Applies to receiver channels only.

maxSenders16

Maximum Kafka producers (senders) per server. For unlimited producers, enter -1 as the value.

Applies to sender channels only.

pollTimeout600000

The time (in milliseconds) spent waiting in poll if data is not available in the buffer.

If set to 0, returns from polling state immediately either with any records currently available in the buffer or empty.

Applies to receiver channels only.

receiveNot applicableSet to true to configure the channel for receiving messages.
requestTimeout25000

Timeout (in milliseconds) for Kafka requests.

sendNot applicable

Set to true to configure the channel for sending messages.

topic

kafkaTopic1


For sender channels, the Kafka topic to send to.
For receiver channels, the Kafka topic to receive from.

If the channel receives input from multiple topics, you can enter a space-separated list of topics.

If the channel is used for both sending and receiving, it sends messages to the first topic in the list and receives messages from all the topics in the list including the first one.

warningTimeout2000

Number of milliseconds before a timeout warning occurs for longrunning queries.

For infinite timeout, specify 0 as the value.

You can define some attributes both at the channel level and the channel connection level. Channel connection settings override any individual channel settings.

Additional configuration for sending Avro messages

Kafka messages are often sent in the Avro message format. If your NexJ application uses the Avro format to send Kafka messages, the Kafka channel requires the following additional configuration.

To reduce payload size, Avro separates data from the schema. In order for the NexJ application to send Avro data in a schema that the receiving application recognizes, you must add the schema registry URL property to the Kafka sender channel connection settings and set its value to the location of the Confluent schema registry that the receiving application uses. The first time a channel connection sends a message of a certain type, NexJ uploads the schema for that message type to the Confluent schema registry. The receiving application then deserializes the data from the NexJ application based on the schema IDs specified in the message.

For more information about Confluent schema registry, see https://docs.confluent.io/current/schema-registry/docs/index.html.

To specify the schema registry location for Kafka messages:

  1. In the Deployment layer of NexJ Studio, open the environment file for the deployment that will be using Avro messages.
  2. Click the Channel Connections tab for the environment.
  3. In the Channel Connections area, select the KafkaConnection that will send the send the Avro messages.

  4. In the Properties tab, under Sender Properties, add a property named schema.registry.url and set its value to the schema registry location.

    The format for the schema registry location is http[s]://<host>:<port>.

    The property displays as follows in the code:

    <KafkaConnection ...>
    	<SenderProperties>
    		<Property name="schema.registry.url" value="http://localhost:8081"/>
    	</SenderProperties>
    	<ReceiverProperties>
    		<Property .../>
    	</ReceiverProperties>
    </KafkaConnection>
  5. Click the Save button  in the toolbar to save the changes to the channel connection.
    The schema registry location is updated.

If your NexJ application is currently running, you must redeploy the application for the changes to take effect.

If you want to change the schema registry location in the future, ensure that all existing schemas are also available in the new location with the same schema IDs. Absence of existing schemas in the new location or conflicts in schema IDs can cause failures in processing older messages.