# Google Cloud Pub/Sub

Google Cloud Pub/Sub: Asynchronous messaging service for ingesting and distributing event data between services and applications. 

To add a [Google Cloud Pub/Sub](<https://cloud.google.com/pubsub?hl=en>) destination to Axoflow, complete the following steps. Axoflow can send data to Google Cloud Pub/Sub using its gRPC interface.

## Prerequisites

  * A [Google Pub/Sub subscription](<https://cloud.google.com/pubsub?hl=en>).
  * An [IAM service account](<https://cloud.google.com/iam/docs/service-account-overview>) that Axoflow uses for authentication.
  * A Google Cloud project that has the Pub/Sub API enabled.



For details, see the [Google Pub/Sub tutorial](<https://cloud.google.com/pubsub/docs/building-pubsub-messaging-system#before_you_begin>).

## Steps

  1. Create a new destination.

     1. Open the AxoConsole.
     2. Select **Destinations > \+ Add Destination**.
  2. Configure the destination.

     1. Select **Pub/Sub**.

     2. Select which type of configuration you want to use:

        * **Simple** : Send all data into a project and topic.
        * **Dynamic** : Send data to a project and topic based on the content or metadata of the incoming messages (or to a default project/topic).
     3. Enter a name for the destination.

![Configure the Google Pub/Sub destination](/docs/axoflow/destinations/google/pubsub/google-pubsub-destination-simple.png)

     4. Specify the project and topic to send the data to.

        * **Simple** : Enter the ID of the GCP **Project** and the **Topic**. All data will be sent to this topic.
        * **Dynamic** : Enter the expression that specifies the default **Project** and **Topic**. The data will be sent here unless it is set during the processing of the message (for example, by the processing steps of the **Flow**).

You can use [AxoSyslog macros](<https://axoflow.com/docs/axosyslog-core/chapter-manipulating-messages/customizing-message-format/reference-macros/>) in this field.

     5. Configure the authentication method to access the GCP project.

        * **Automatic (ADC)** : Use the service account attached to the cloud resource (VM) that hosts AxoRouter.

        * **Service Account File** : Specify the path where a service account key file is located (for example, `/etc/axorouter/user-config/`). You must manually copy that file to its place, currently you can’t distribute it from Axoflow.

        * **None** : Disable authentication completely. Only available when the **More options > Service Endpoint** option is set.

**CAUTION:**

Do not disable authentication in production. 

     6. (Optional) Set other options as needed for your environments.

        * **Service Endpoint** : Use a custom API endpoint. Leave it empty to use the default: `https://pubsub.googleapis.com`
        * **Disk Buffer**: Explicitly enable or disable disk buffer on AxoRouter. Default value: **Enabled**.

AxoRouter can store messages on the local disk if the destination or the network connection to the destination becomes unavailable. AxoRouter automatically sends the stored messages (in the order they were received) to the destination when the connection is reestablished. Note that the disk buffer is separate from AxoStore.

**Disk buffer capacity (bytes per worker)** : Size of the disk buffer files in bytes. This value applies for each worker. Default value is 100MiB (104857600 bytes), which is also the recommended minimum.

The disk required for using disk buffering is **Disk buffer capacity (bytes per worker)*** **Number of Workers** for each destination of AxoRouter.

        * **Batch Bytes**: Sets the maximum size of payload in a batch. If the size of the messages reaches this value, AxoRouter sends the batch to the destination even if the number of messages is less than the value of the batch lines option.

**CAUTION:**

When setting **Batch Bytes** :

          * Note that the actual HTTP request will be about 4KB larger because of the request headers.
          * Check the configuration of your destination, as SaaS solutions and ingress controllers often limit the maximal size of HTTP requests. If the sum of **Batch Bytes** and the request headers exceeds this limit, the data will be rejected. (In case of NGINX, with a `client intended to send too large body` error message.)

        * **Batch Lines**: Number of lines sent to the destination in one batch. AxoRouter waits for this number of lines to accumulate and sends them off in a single batch. Increasing this number increases throughput as more messages are sent in a single batch, but also increases message latency.

Note that **Initial window size** * **Maximum Connections** must be greater or equal than **Batch lines** * **Number of workers** for all the sources that feeds a given destination. Otherwise, the destination worker might be unable to send data until **Batch timeout** expires, even if there’s data available.

For more details, see the [AxoSyslog documentation](<https://axoflow.com/docs/axosyslog-core/chapter-destinations/configuring-destinations-http-nonjava/reference-destination-http-nonjava/#batch-lines>).

        * **Batch Timeout**: Maximal time in milliseconds to wait for a batch to be filled before sending it. The default value (`-1`) means that the batch should be full before sending, which may result in a long wait if the incoming traffic is low. For more details, see the [AxoSyslog documentation](<https://axoflow.com/docs/axosyslog-core/chapter-destinations/configuring-destinations-http-nonjava/reference-destination-http-nonjava/#batch-timeout>).

        * **Number of Workers**: Used for scaling the destination in case of high message load. Specifies the number of worker threads AxoRouter uses for sending messages to the destination. The default is 1. If high message throughput is not a concern, leave it on 1. For maximum performance, increase it up to the number of CPU cores available on AxoRouter. For more details, see the [AxoSyslog documentation](<https://axoflow.com/docs/axosyslog-core/chapter-destinations/configuring-destinations-http-nonjava/reference-destination-http-nonjava/#workers>).
     7. Select **Add**.

  3. Create a [flow](../../../docs/axoflow/data-management/index.md) to connect the new destination to an [AxoRouter instance](../../../docs/axoflow/provisioning/axorouter/index.md).
     1. Select **Flows**.

     2. Select **Add Flow**.

     3. Enter a name for the flow, for example, `my-test-flow`.

![Create a flow](/docs/axoflow/img/data-management/flow-management/flows/create-flow.png)

     4. In the **Router Selector** field, enter an expression that matches the router(s) you want to apply the flow. To select a specific router, use a name selector, for example, `name = my-axorouter-hostname`.

You can use any labels and metadata of the AxoRouter hosts in the Router selectors, for example, the hostname of the AxoRouter, or any [custom labels](../../../docs/axoflow/onboard-hosts/hosts/add-host-metadata/index.md).

        * If you leave the **Router Selector** field empty, the selector will match every AxoRouter instance.
        * To select only a specific AxoRouter instance, set the `name` field to the name of the instance as selector. For example, `name = my-axorouter`.
        * If you set multiple fields in the selector, the selector will match only AxoRouter instances that match all elements of the selector. (There in an AND relationship between the fields.)
     5. Select the **Destination** where you want to send your data. If you don’t have any destination configured, you can select **\+ Add** in the destination section to create a new destination now. For details on the different destinations, see [Destinations](../../../docs/axoflow/destinations/index.md).

        * If you don’t have any destination configured, see [Destinations](../../../docs/axoflow/destinations/index.md).
        * If you’ve already created a [store](../../../docs/axoflow/destinations/axostore/index.md), it automatically available as a destination. Note that the **Router Selector** of the flow must match only AxoRouters that have the selected store available, otherwise you’ll get an error message.
        * If you want to send data to another AxoRouter, enable the **Show all destinations** option, and select the connector of the AxoRouter where you want to send the data.

![AxoRouter as destination](/docs/axoflow/img/data-management/flow-management/flows/axorouter-destination.png)

     6. (Optional) To process the data transferred in the flow, select **Add New Processing Step**. For details, see [Processing steps](../../../docs/axoflow/data-management/processing/index.md). For example:

        1. Add a **Classify** , a **Parse** , and a **Reduce** step, in that order, to automatically remove redundant and empty fields from your data.
        2. To select which messages are processed by the flow, add a **Select Messages** step, and enter a filter into the **AQL Expression** field. For example, to select only the messages received from Fortinet FortiGate firewalls, use the `meta.vendor = fortinet AND meta.product = fortigate` query.
        3. **Save** the processing steps.

![Example processing steps](/docs/axoflow/img/data-management/flow-management/flows/processing/example-processing-steps.png)

     7. Select **Add**.

     8. The new flow appears in the **Flows** list.

![The new flow](/docs/axoflow/img/data-management/flow-management/flows/new-flow.png)




## Pub/Sub attributes

You can embed [custom attributes as metadata in Pub/Sub messages](<https://cloud.google.com/pubsub/docs/publisher#using-attributes>) in the processing steps of the Flow that’s sending data to the Pub/Sub destination.

To do that:

  1. Add a [FilterX processing step](../../../docs/axoflow/data-management/processing/index.md) to the Flow.

  2. Edit the **Statements** field of the processing step:

     1. Add the `meta.pubsub.attributes = json();` line to add an empty JSON object to the messages.

     2. Set your custom attributes under the `meta.pubsub.attributes` key. For example, if you want to include the timestamp as a custom attribute as well, you can use:
```
 meta.pubsub.attributes = {"timestamp": $S_ISODATE};
            
```




## Related message fields

You can use the [following message fields](../../../docs/axoflow/reference/message-schema/reference/index.md#meta.destination.pubsub) to modify messages sent to this destination using [processing steps](../../../docs/axoflow/data-management/processing/index.md).