Kafka produces records to the Kafka brokers configured in it.
     

Take a look at the configuration parameters of the component:
   

  • Kafka Authentication Account (BASIC): if the Kafka server needs authentication, it will be necessary to create an account type BASIC for this component. We also support authentication via Kerberos.

  • Truststore: if it’s necessary to inform a truststore to make the SSL Handshake using private certificates, a CERTIFICATE-CHAIN account type must be created and the concatenated certificates must be informed. It’s optional to inform, in the “password” field, the password to be registered in the truststore creation.

  • Keystore: if it’s necessary to inform a keystore to make the mutual SSL authentication, a CERTIFICATE-CHAIN account type must be created, the complete chain with the concatenated certificates and the private key to be used for the SSL mutual authentication must be informed. If there’s a private key, it’s necessary to inform it in the “password” field.

  • Brokers: brokers of the server (HOST: PORT) used to send registers. To inform multiple HOSTS, you can separate them by comma. Example: HOST1:PORT1,HOST2:PORT2,...,HOSTn:PORTn

  • Security Protocol: way the connection is established. You can use (or not) a security channel (SSL) and an authentication channel (SASL). The use of both (SASL_SSL) is also possible.


IMPORTANT: due to the need of a great memory allocation, we don't support the following types of Security Protocol: PLAINTEXT and SASL_PLAINTEXT. To understand it better, click here.
      


  • Schema Registry URL: if at least one of the options Headers By Avro Schema, Payload As Avro and Partition Key As Avro is activated, the field will be shown to set the Schema Registry’s URL.

  • Schema Registry Account: account to authenticate with the Schema Registry.

  • Schema Registry Truststore: if it’s necessary to inform a truststore to make the SSL Handshake using private certificates, a CERTIFICATE-CHAIN account type must be created, and the concatenated certificates must be informed. It’s optional to inform, in the “password” field, the password to be registered in the truststore creation.

  • Schema Registry Keystore: if it’s necessary to inform a keystore to make the mutual SSL authentication, a CERTIFICATE-CHAIN account type must be created, the complete chain with the concatenated certificates and the private key to be used for the SSL mutual authentication must be informed. If there’s a private key, it’s necessary to inform it in the “password” field.

  • Headers: set of "key": "value" inputs, with headers to be sent in the message (optional field). If the option Headers By Avro Schema is enabled, the field format will be changed to receive a JSON with the Headers values.

  • Binary Headers: if this option is enabled, the header values are considered binary, and are interpreted as a string with base64 representation; otherwise, the header values are interpreted as text.

  • Headers By Avro Schema: if the option is enabled, the component will validate the Headers based on an Avro Schema before sending the Headers.

  • Headers Schema: if the option Headers By Avro Schema is enabled, the field will be shown to set the Headers Schemas to be validated.

  • Headers Charset: name of the characters code for the header values codification (standard UTF-8).

  • Payload: payload to be dispatched.

  • Payload As Avro: if the option is enabled, the component will send the payload in Avro format.

  • Payload Schema: if the option Payload as Avro is enabled, the field will be shown to inform the Payload Schema to be validated.

  • Request Timeout: configuration that controls the maximum time (in milliseconds) that the client waits for the response of an inquiry. If the response isn't received before the maximum time elapses, the inquiry is automatically resent. Otherwise, there'll be an error if the retries are exhausted.

  • Retries: If a value different from 0 (zero) is established, any register whose dispatch fails will be resent. These registers might be resent with a probable transient error.

  • Metadata Timeout: maximum time to the Kafka register dispatch.

  • Key Strategy: if Partition Key As Avro is enabled, the field will be displayed for configuration of the Subject strategy to be used to build the Subject name for the message Keys.

  • Value Strategy: If the Payload as Avro option is enabled, the field will be displayed for configuration of the Subject strategy to be used to build the Subject name for the message values.

  • Fail On Error: If the option is enabled, the execution of the pipeline with error will be interrupted; otherwise, the pipeline execution proceeds, but the result will show a false value for the "success" property.

  • Kerberos Service Name: value defined in the sasl.kerberos.service.name property configured in the Kafka broker server side.

  • Partition Number: specified the numbers of the partition where Kafka Trigger will send the messages to. If the property isn't configured, the Kafka server will be responsible for deciding which topic partitions the message will be sent to.

  • Partition Key: a partition key can be specified to indicate the partition where the message will be sent to. If the field isn't filled, a partitioner based on hashing is used to determine the partition id given to each key.

  • Partition Key As Avro: if the option is enabled, the component will send the partition key in Avro format.

  • Partition Key Schema: if the option Partition Key As Avro is enabled, the field will be shown to inform the partition key Schema to be validated.

  • Producer Client Name: origin identifier of the requests (optional).

  • Acks: configuration for acknowledging the message receipt by the Kafka broker (values: 0, 1 or ALL)


IMPORTANT: The messages sent in Avro format must be of the maximum size supported by Pipelines SMALL, MEDIUM and LARGE. The component does not support extreme reading scenarios of mega/giga/tera/peta bytes.

The Avro format support is currently in Beta phase.


Example of request response to Kafka 

{
  "message": "{}",
  "offset": 201,
  "timestamp": 1585168528773,
  "serializedKeySize": -1,
  "serializedValueSize": 2,
  "topic": "Welcome-Kafka",
  "partition": 1,
  "success": true
}

         

  • message: message sent

  • offset: offset of the record in the topic/partition

  • timestamp: time stamp of the record in the topic/partition

  • serializedKeySize: size of the serialized key, uncompressed in bytes. If the value is null, the returned size is -1

  • serializedValueSize: size of the serialized value, uncompressed in bytes. If the value is null, the returned size is -1

  • topic: name of the topic

  • partition: partition the record was sent to

  • success: if "true", the dispatch was successfully made

        

Messages flow

Input

The component accepts any input message and can use it through Double Braces.

                  

Output

The component doesn't change any information of the input message. Therefore, it's returned to the following component or it's used as final answer if this component is the last step of the pipeline.
 

    

Kafka in Action

Authentication using SSL or SASL

That allows the authentication of your producers and clients to the Kafka cluster (identity verification). This is also a secure way to allow your clients to confirm their identity.

Authentication using Kerberos

To use the authentication via Kerberos in Kafka is necessary to have registered the configuration file “krb5.conf” in the Realm parameter. If you haven't done it yet, get in touch with us by the chat service. After finishing this step, all you have to do is to correctly set a Kerberos-type account and use it in the component.

Did this answer your question?