This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

kafka: Publish messages to Apache Kafka (Java implementation)

Starting with version 3.7, AxoSyslog can directly publish log messages to the Apache Kafka message bus, where subscribers can access them.

  • Since AxoSyslog uses the official Java Kafka producer, the kafka destination has significant memory usage.

Declaration:

   @include "scl.conf"
    
    kafka(
        client-lib-dir("/opt/syslog-ng/lib/syslog-ng/java-modules/:<path-to-preinstalled-kafka-libraries>")
        kafka-bootstrap-servers("1.2.3.4:9092,192.168.0.2:9092")
        topic("${HOST}")
    
    );

Example: Sending log data to Apache Kafka

The following example defines a kafka destination, using only the required parameters.

   @include "scl.conf"
    
    destination d_kafka {
      kafka(
        client-lib-dir("/opt/syslog-ng/lib/syslog-ng/java-modules/KafkaDestination.jar:/usr/share/kafka/lib/")
        kafka-bootstrap-servers("1.2.3.4:9092,192.168.0.2:9092")
        topic("${HOST}")
      );
    };

The kafka() driver is actually a reusable configuration snippet configured to receive log messages using the Java language-binding of AxoSyslog. For details on using or writing such configuration snippets, see Reusing configuration blocks. You can find the source of the kafka configuration snippet on GitHub. For details on extending AxoSyslog in Java, see the Getting started with syslog-ng development guide.

1 - Prerequisites

To publish messages from AxoSyslog to Apache Kafka, complete the following steps.

Steps:

  1. If you want to use the Java-based modules of AxoSyslog (for example, the Elasticsearch, HDFS, or Kafka destinations), you must compile AxoSyslog with Java support.

    • Download and install the Java Runtime Environment (JRE), 1.7 (or newer). You can use OpenJDK or Oracle JDK, other implementations are not tested.

    • Install gradle version 2.2.1 or newer.

    • Set LD_LIBRARY_PATH to include the libjvm.so file, for example:LD_LIBRARY_PATH=/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/amd64/server:$LD_LIBRARY_PATH

      Note that many platforms have a simplified links for Java libraries. Use the simplified path if available. If you use a startup script to start AxoSyslog set LD_LIBRARY_PATH in the script as well.

    • If you are behind an HTTP proxy, create a gradle.properties under the modules/java-modules/ directory. Set the proxy parameters in the file. For details, see The Gradle User Guide.

  2. Download the latest stable binary release of the Apache Kafka libraries (version 0.9 or newer) from http://kafka.apache.org/downloads.html.

  3. Extract the Apache Kafka libraries into a single directory. If needed, collect the various .jar files into a single directory (for example, /opt/kafka/lib/) where AxoSyslog can access them. You must specify this directory in the AxoSyslog configuration file.

  4. Check if the following files in the Kafka libraries have the same version number: slf4j-api-<version-number>.jar, slf4j-log4j12-<version-number>.jar. If the version number of these files is different, complete the following steps:

    1. Delete one of the files (for example, slf4j-log4j12-<version-number>.jar).

    2. Download a version that matches the version number of the other file (for example, 1.7.6) from the official SLF4J distribution.

    3. Copy the downloaded file into the directory of your Kafka library files (for example, /opt/kafka/lib/).

2 - How AxoSyslog interacts with Apache Kafka

When stopping the AxoSyslog application, AxoSyslog will not stop until all Java threads are finished, including the threads started by the Kafka Producer. There is no way (except for the kill -9 command) to stop AxoSyslog before the Kafka Producer stops. To change this behavior set the properties of the Kafka Producer in its properties file, and reference the file in the properties-file option.

The AxoSyslog kafka destination tries to reconnect to the brokers in a tight loop. This can look as spinning, because of a lot of similar debug messages. To decrease the amount of such messages, set a bigger timeout using the following properties:

   retry.backoff.ms=1000
    reconnect.backoff.ms=1000

For details on using property files, see properties-file(). For details on the properties that you can set in the property file, see the Apache Kafka documentation.

3 - Kafka destination options

The kafka destination of AxoSyslog can directly publish log messages to the Apache Kafka message bus, where subscribers can access them. The kafka destination has the following options.

Required options:

The following options are required: kafka-bootstrap-servers(), topic(). Note that to use kafka, you must add the following lines to the beginning of your AxoSyslog configuration:

   @include "scl.conf"

client-lib-dir()

Type:string
Default:The AxoSyslog module directory: /opt/syslog-ng/lib/syslog-ng/java-modules/

Description: The list of the paths where the required Java classes are located. For example, class-path("/opt/syslog-ng/lib/syslog-ng/java-modules/:/opt/my-java-libraries/libs/"). If you set this option multiple times in your AxoSyslog configuration (for example, because you have multiple Java-based destinations), AxoSyslog will merge every available paths to a single list.

For the kafka destination, include the path to the directory where you copied the required libraries (see Prerequisites), for example, client-lib-dir("/opt/syslog-ng/lib/syslog-ng/java-modules/KafkaDestination.jar:/usr/share/kafka/lib/\*.jar").

kafka-bootstrap-servers()

Type:list of hostnames
Default:

Description: Specifies the hostname or IP address of the Kafka server. When specifying an IP address, IPv4 (for example, 192.168.0.1) or IPv6 (for example, [::1]) can be used as well. Use a colon (:) after the address to specify the port number of the server. When specifying multiple addresses, use a comma to separate the addresses, for example, kafka-bootstrap-servers("127.0.0.1:2525,remote-server-hostname:6464")

frac-digits()

Type:number
Default:0

Description: The AxoSyslog application can store fractions of a second in the timestamps according to the ISO8601 format. The frac-digits() parameter specifies the number of digits stored. The digits storing the fractions are padded by zeros if the original timestamp of the message specifies only seconds. Fractions can always be stored for the time the message was received.

hook-commands()

Description: This option makes it possible to execute external programs when the relevant driver is initialized or torn down. The hook-commands() can be used with all source and destination drivers with the exception of the usertty() and internal() drivers.

Using hook-commands() when AxoSyslog starts or stops

To execute an external program when AxoSyslog starts or stops, use the following options:

startup()

Type:string
Default:N/A

Description: Defines the external program that is executed as AxoSyslog starts.

shutdown()

Type:string
Default:N/A

Description: Defines the external program that is executed as AxoSyslog stops.

Using the hook-commands() when AxoSyslog reloads

To execute an external program when the AxoSyslog configuration is initiated or torn down, for example, on startup/shutdown or during a AxoSyslog reload, use the following options:

setup()

Type:string
Default:N/A

Description: Defines an external program that is executed when the AxoSyslog configuration is initiated, for example, on startup or during a AxoSyslog reload.

teardown()

Type:string
Default:N/A

Description: Defines an external program that is executed when the AxoSyslog configuration is stopped or torn down, for example, on shutdown or during a AxoSyslog reload.

Example: Using the hook-commands() with a network source

In the following example, the hook-commands() is used with the network() driver and it opens an iptables port automatically as AxoSyslog is started/stopped.

The assumption in this example is that the LOGCHAIN chain is part of a larger ruleset that routes traffic to it. Whenever the AxoSyslog created rule is there, packets can flow, otherwise the port is closed.

   source {
       network(transport(udp)
        hook-commands(
              startup("iptables -I LOGCHAIN 1 -p udp --dport 514 -j ACCEPT")
              shutdown("iptables -D LOGCHAIN 1")
            )
         );
    };

jvm-options()

Type:list
Default:N/A

Description: Specify the Java Virtual Machine (JVM) settings of your Java destination from the AxoSyslog configuration file.

For example:

   jvm-options("-Xss1M -XX:+TraceClassLoading")

You can set this option only as a global option, by adding it to the options statement of the syslog-ng.conf configuration file.

on-error()

Type:One of: drop-message, drop-property, fallback-to-string, silently-drop-message, silently-drop-property, silently-fallback-to-string
Default:Use the global setting (which defaults to drop-message)

Description: Controls what happens when type-casting fails and AxoSyslog cannot convert some data to the specified type. By default, AxoSyslog drops the entire message and logs the error. Currently the value-pairs() option uses the settings of on-error().

  • drop-message: Drop the entire message and log an error message to the internal() source. This is the default behavior of AxoSyslog.
  • drop-property: Omit the affected property (macro, template, or message-field) from the log message and log an error message to the internal() source.
  • fallback-to-string: Convert the property to string and log an error message to the internal() source.
  • silently-drop-message: Drop the entire message silently, without logging the error.
  • silently-drop-property: Omit the affected property (macro, template, or message-field) silently, without logging the error.
  • silently-fallback-to-string: Convert the property to string silently, without logging the error.

key()

Type:template
Default:N/A

Description: The key of the partition under which the message is published. You can use templates to change the topic dynamically based on the source or the content of the message, for example, key("${PROGRAM}").

log-fifo-size()

Type:number
Default:Use global setting.

Description: The number of messages that the output queue can store.

properties-file()

Type:string (absolute path)
Default:N/A

Description: The absolute path and filename of the Kafka properties file to load. For example, properties-file("/opt/syslog-ng/etc/kafka_dest.properties"). The AxoSyslog application reads this file and passes the properties to the Kafka Producer. If a property is defined both in the AxoSyslog configuration file (syslog-ng.conf) and in the properties file, then AxoSyslog uses the definition from the AxoSyslog configuration file.

The AxoSyslog kafka destination supports all properties of the official Kafka producer. For details, see the Apache Kafka documentation.

The kafka-bootstrap-servers option is translated to the bootstrap.servers property.

For example, the following properties file defines the acknowledgment method and compression:

   acks=all
    compression.type=snappy

retries()

Type:number (of attempts)
Default:3

Description: If AxoSyslog cannot send a message, it will try again until the number of attempts reaches retries().

If the number of attempts reaches retries(), AxoSyslog will wait for time-reopen() time, then tries sending the message again.

sync-send()

Type:`true
Default:false

Description: When sync-send is set to true, AxoSyslog sends the message reliably: it sends a message to the Kafka server, then waits for a reply. In case of failure, AxoSyslog repeats sending the message, as set in the retries() parameter. If sending the message fails for retries() times, AxoSyslog drops the message.

This method ensures reliable message transfer, but is very slow.

When sync-send() is set to false, AxoSyslog sends messages asynchronously, and receives the response asynchronously. In case of a problem, AxoSyslog cannot resend the messages.

This method is fast, but the transfer is not reliable. Several thousands of messages can be lost before AxoSyslog recognizes the error.

template()

Type:template or template function
Default:$ISODATE $HOST $MSGHDR$MSG\\n

Description: The message as published to Apache Kafka. You can use templates and template functions (for example, format-json()) to format the message, for example, template("$(format-json --scope rfc5424 --exclude DATE --key ISODATE)").

For details on formatting messages in JSON format, see format-json.

throttle()

Type:number
Default:0

Description: Sets the maximum number of messages sent to the destination per second. Use this output-rate-limiting functionality only when using disk-buffer as well to avoid the risk of losing messages. Specifying 0 or a lower value sets the output limit to unlimited.

topic()

Type:template
Default:N/A

Description: The Kafka topic under which the message is published. You can use templates to change the topic dynamically based on the source or the content of the message, for example, topic("${HOST}").

time-zone()

Type:name of the timezone, or the timezone offset
Default:unspecified

Description: Convert timestamps to the timezone specified by this option. If this option is not set, then the original timezone information in the message is used. Converting the timezone changes the values of all date-related macros derived from the timestamp, for example, HOUR. For the complete list of such macros, see Date-related macros.

The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")), or as the timezone offset in +/-HH:MM format, for example, +01:00). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo directory.

ts-format()

Type:rfc3164, bsd, rfc3339, iso
Default:rfc3164

Description: Override the global timestamp format (set in the global ts-format() parameter) for the specific destination. For details, see ts-format().