Skip to main content

KCL

Under Review

This page is still under review and none of the content presented here should be construed as finalised work, and no features listed are assured to be included in the eventual v1.0 release version.

This document provides an extensive overview of the KCL, a robust Go library designed to streamline both FanX Message Broker consumer and producer operations while ensuring high performance. KCL offers a range of features to simplify FanX Message Broker integration within your projects. Explore this document to understand how KCL facilitates message consumption and production, error handling, graceful shutdowns, and offset management, empowering you to efficiently leverage FanX Message Broker in your projects.

⚠️ Disclaimer: The primary goal of KCL is to achieve high performance in FanX Message Broker client operations. Due to this focus on performance, consumers' implementation does not ensure exactly-once delivery semantics. Please ensure you have a deduplication mechanism in place before utilizing KCL for consumption. All producer functions guarantee ack-safe operations, meaning that if no error is returned, the record was successfully populated to the specified number of brokers (where the number is the topic's replication factor).

Features

  • Simplified Consumer/Producer Operations: KCL simplifies FanX Message Broker consumer and producer operations, abstracting away the complexities of low-level FanX Message Broker APIs and enabling you to concentrate on message processing.
  • Concurrent Topic Consumption: KCL supports concurrent consumption of multiple topics, allowing messages from various topics to be processed concurrently.
  • Error Handling: Comprehensive error handling mechanisms provided by KCL enable the management of retriable and non-retriable errors, ensuring graceful shutdowns during critical error scenarios.
  • Offset Management: KCL manages consumer offsets, facilitating offset commitment, marking records for commit, and forcing the commit of specific records.

How to Initialize a KCL Client

import "bitbucket.org/InCrowd-Sports/kcl"

client, err := kcl.New(
kcl.WithEnv("stage"),
kcl.WithServiceName("example-app"),
kcl.WithAuth("username", "password"),
kcl.WithMaxFetchRecords(1000),
kcl.WithMinFetchBytes(500 << (10 * 1)), // 500 KB
kcl.WithMaxFetchWait(10 * time.Second),
kcl.WithDefaultConsumeTopics("topic.1", "topic.2"),
)
if err != nil {
// handle err
}

defer client.Close()

And you probably will want to add/register new topics to the client you just created:

client.AddTopics("topic.3", "topic.4")

How to Produce Messages

KCL sends a FanX Message Broker record to the specified topic. Records are produced in order per partition upon successful production. If the record's timestamp is unset, it defaults to the current time.

If the topic field is empty or the record is too large to fit in a batch, the record is failed immediately. If the client is configured to automatically flush and the buffer is full, Produce will block until space is available, or it will fail immediately if flushing is configured.

Records can be canceled if the context is canceled, the record times out, or hits maximum retries.

Currently, KCL allows callers to produce messages using a batching approach based on specified Priority. Allowed Priority values are:

  • LowPriority = 0

  • NormalPriority = 10

  • HighPriority = 20

  • ImmediatePriority = 30 -> Events with Immediate priority (or higher) will skip batching and will be produced immediately.

  • Produce Options:

    • WithKey(key []byte) -> specifies the event Key to produce.
    • WithIntKey(key int64) -> specifies the event numeric key to produce.
    • WithHeaders(headers []Header) -> Adds headers (key-value pairs) to the events that we are going to produce.
    • WithPriority(priority Priority) -> Sets the event Priority
    • WithSignature(enabled bool) -> Enable/disable event signature when producing.

How to Consume Messages

  • Consume Individually: Process each record individually.
err := client.Consume(context.Background(), func(record *kcl.Record) error {
fmt.Printf("record value: %s", string(record.Value))
return nil
})

Consume accepts parameters to configure how messages are consumed

By default, auto-commit and auto-release is enabled, so records can't be used outside the processor function context

  • Consume Individually (with Options):
// KCL will spawn 10 workers for records processing, additional parameters are self-explanatory
err := client.Consume(context.Background(), func(record *kcl.Record) error {
fmt.Printf("record value: %s", string(record.Value))
return nil
}, kcl.WithParallelism(10), kcl.WithoutAutoCommit(), kcl.WithoutAutoRelease(), kcl.WithoutSignatureVerification())
  • Consume All Records: Perform chunk consumption, processing multiple records at once.
err := client.ConsumeAll(context.Background(), func(records []*kcl.Record) error {
fmt.Printf("chunk size: %d", len(records))
return nil
})
  • Consume All Records Per Topic: Chunk consumption grouped by topic for efficient processing.
err := client.ConsumePerTopic(context.Background(), func(topic string, records []*kcl.Record) error {
fmt.Printf("[%s] chunk size: %d", topic, len(records))
return nil
})

Please note that new goroutine will be spawned for each topic.

Only the first non-nil error will be returned. Use ConsumeRaw if you need to manually handle each error.

  • Consume All Records Per Partition:
err := e.kc.ConsumePerPartition(context.Background(), func(topic string, partition int32, records []*kcl.Record) error {
for _, record := range records {
// process record
}

return nil
})
  • Consume Options:
    • WithTopics(topics ...string) -> Which topics are we going to consume.
    • WithParallelism(parallelism int) -> How many workers will consume in parallel
    • WithAutoCommit(enabled bool) -> Enable/disable committing automatically after consuming event.
    • WithAutoRelease(enabled bool) -> Enable/disable releasing automatically the event after consuming.
    • WithSignatureVerification(enabled bool) -> Enable/disable signature verification when consuming event.

Error Handling and Graceful Shutdown

  • KCL automatically handles retriable errors, logging them with LevelError. Non-retriable errors halt the consumption function, returning the first encountered error.
  • In the event of client or consumption context cancellation, nil will be returned (only for context.Cancelled).
  • If the event processor encounters an unrecoverable error, it should return a non-nil error, which will halt the consumer. Manual unsubscription from topics (e.g., calling RemoveTopics) is required in such cases.
  • To gracefully shut down the library, utilize client.Stop() or cancel the context passed to kcl.New(ctx).
  • For stopping only the consumer, use the following methods: PauseTopics (to halt consumption for specific topics), RemoveTopics (to entirely remove metadata for specific topics), or cancel the context passed to the consumer to exit the poll loop.

Committing Offsets

  • By default, KCL does not commit every consumed record but waits for records to be explicitly marked as 'processed' to avoid overwhelming FanX Message Broker brokers in high-throughput scenarios.
  • Utilize WithAutoCommitInterval(5 * time.Second) to specify how often KCL should commit marked records.
  • Manually commit marked records using CommitMarked(ctx).
  • Use ForceCommit(ctx, records) to push specific offsets without waiting for auto-commit.
  • Trigger CommitMarked for all consumed topics upon client.Stop(), but it's advisable to defer the call manually to prevent loss of processed offsets, potentially leading to duplicate processing.