How to Implement the Pipes and Filters Architecture with Java and Azure

In my previous blog post, I provided an in-depth explanation of the Pipes and Filters architecture, which you can check out here. To recap, the Pipes and Filters architecture breaks down a system into small, self-contained processing components known as filters. Each filter is responsible for performing a specific task or transformation on the data it receives, promoting modularity and reusability. These filters are connected via pipes, which facilitate the flow of data from one filter to the next. This architecture is particularly effective in scenarios involving data integration, processing workflows, transformation pipelines, and stream processing.

In this blog post, I will walk you through a sample implementation of the Pipes and Filters architecture using Java and Azure. Our example project will centre around a chatbot designed to assist in creative thinking activities such as brainstorming.

Sample Project Overview

The goal of this project is to create a tool that integrates with a company’s creative thinking solutions. Specifically, it’s a chatbot that aids teams during brainstorming sessions and other creative activities. The process begins when a user interacts with the application by typing a question, such as “How will the ongoing AI revolution affect the financial industry?” This question is then sent to the application for processing.

How the System Works

  1. Input Validation: The first filter is responsible for validating the user’s question. The question might be in a language that the AI model doesn’t understand, or it might be too long or contain sensitive information. Therefore, the first task is to verify whether the question can be processed further.
  2. Prompt Engineering: If the question is valid, the application uses predefined templates to enrich it. These templates provide context to the AI-powered tool, making the model’s output more valuable. For example, a template might be: “You are a CEO. Given a strategic prompt, you will create X futuristic, hypothetical scenarios that happen Y years from now. The strategic prompt is: Z”. This step is crucial as it leverages prompt engineering to guide the AI model in generating more meaningful responses.
  3. AI Model Interaction: The final step involves sending the enriched prompts to the AI model, which processes them and generates answers. These answers are then displayed back to the user.

Implementation Details

The system consists of three filters:

  1. Input Validation Filter: Validates the user’s input according to the application’s data requirements.
  2. Prompt Engineering Filter: Analyses and enriches the validated input to create a prompt.
  3. AI Model Facade Filter: Sends the engineered prompt to the AI model and handles the response.

The First Filter: Input Validation

The first filter is implemented as an Azure Function, and its primary role is to validate the incoming question.

@FunctionName("QuestionValidationFunction")
public HttpResponseMessage validate(
       @HttpTrigger(name = "question",
               methods = {HttpMethod.POST},
               authLevel = AuthorizationLevel.FUNCTION)
       HttpRequestMessage<String> question,
       @QueueOutput(name = "questionQueue",
               queueName = "question-queue",
               connection = "AzureWebJobsStorage")
       OutputBinding<String> questionQueue,
       ExecutionContext executionContext) {
    // Implementation of validation.
}

The validate method, annotated with @FunctionName("QuestionValidationFunction"), is triggered by an HTTP request. It takes two parameters: the HTTP request containing the question and an output binding to a storage queue named "question-queue". The method validates the question and, if valid, sends it down the pipeline.

The Second Filter: Prompt Engineering

The second filter enriches the validated question with a template to maximise the AI model’s response quality.

@FunctionName("PromptEngineeringFunction")
public void sendPrompt(
       @QueueTrigger(
               name = "question",
               queueName = "question-queue",
               connection = "AzureWebJobsStorage")
       String question,
       @QueueOutput(
               name = "promptQueue",
               queueName = "prompt-queue",
               connection = "AzureWebJobsStorage")
       OutputBinding<String> promptQueue,
       ExecutionContext executionContext) {
   // Prompt engineering logic.
}

This function is triggered by messages in the "question-queue". When a new message arrives, the function is invoked, and the question is enriched before being sent to the next queue, "prompt-queue".

The Third Filter: AI Model Facade

The third filter handles communication with the AI model. This filter is implemented using the Spring Cloud Function framework, which decouples infrastructure configuration from the business logic. I’ll describe it in detail in the next blog post, but I’ll give you a short description here so you understand the code.

The functions are implemented as Java function interfaces and autowired into respective request handlers. The handlers contain logic that configures integration with the serverless platform provider. In our case it’ll be the Azure SDK (which examples we’ve seen before). With this setup, you can change the cloud provider by simply rewriting the handlers (and changing build definition) without any need to rewrite the functions itself. 

Let’s now look at the function’s code. 

@Bean
public Function<String, String> answer(ModelClient modelClient) {
   	// Function’s logic
}

The answer function is a simple Java function interface that handles the logic for interacting with the AI model. It is autowired into a handler that manages the integration with Azure.

@Component
public class AnswerHandler {

   private final Function<String, String> answer;

   public AnswerHandler(Function<String, String> answer) {
       this.answer = answer;
   }

   @FunctionName("answer")
   public void answer(
           @QueueTrigger(
                   name = "promptQueue",
                   queueName = "prompt-queue",
                   connection = "AzureWebJobsStorage")
           String prompt,
           @QueueOutput(
                   name = "answerQueue",
                   queueName = "answer-queue",
                   connection = "AzureWebJobsStorage")
           OutputBinding<String> answerQueue
   ) {
       // Handler’s logic
   }
}

This handler is similar to the previous filters, but it delegates the business logic to the answer function. The answerQueue is used to send the final answer for further consumption. 

Deployment

With all three filters implemented, you can now deploy the application to Azure, to play with the code. The deployment process can be accomplished using Maven, as described in this article

In summary, we implemented a complete Pipes and Filters architecture using both the Azure SDK and Spring Cloud Function. The system comprises three filters – each responsible for a distinct part of the application’s workflow: input validation, prompt engineering, and AI model communication. The unidirectional data flow is managed primarily by queues, ensuring a clean separation of concerns and easy scalability.

Summary

This blog post demonstrates how to implement the Pipes and Filters architecture using Java and Azure for a chatbot that assists in creative thinking activities. The architecture is broken down into three filters: input validation, prompt engineering, and AI model interaction. Each filter handles a specific task in the data processing pipeline, ensuring modularity and reusability. The post also covers the deployment process using Azure and Spring Cloud Function, highlighting the benefits of separating business logic from infrastructure configuration.

If you’re interested in how this architecture style can be used to implement serverless solutions, and how to work with Azure Functions in Java, check out my Udemy course that covers these topics in detail.  

The working code example can be found on my GitHub

Generating Avro Schemas in Kotlin

When working with Avro schemas, the most popular approach is to generate Java or Kotlin classes based on predefined Avro schemas. This method is particularly useful if your application acts as a consumer or if the schemas are provided by someone else. However, if you are the sole publisher for a topic, you might prefer to generate Avro schemas directly from your model classes.

A handy tool for this task is the avro4k library. This third-party library simplifies the process of generating Avro schemas from Kotlin data classes. It leverages the powerful kotlinx.serialisation library, making the integration straightforward and efficient.

The kotlinx.serialisation library is well-documented and provides extensive resources to help developers get started. You can explore its official documentation here. However, a significant limitation is that the provided setup guides are primarily focused on Gradle. If you are using Maven (as I usually do), you might find the lack of specific instructions frustrating.

In this tutorial, I will guide you through setting up a Maven build, writing a simple Kotlin model, and generating an Avro schema from it using avro4k and kotlinx.serialisation.

Setting Up a Maven Project for Kotlinx serialisation and Avro4k

In this section, I will walk through the steps to set up a Maven project for generating Avro schemas from Kotlin data classes using the kotlinx.serialisation library and the avro4k library. We will start by creating a new Maven project, configuring the necessary dependencies, and finally setting up the Kotlin Maven Plugin.

Step 1: Create a New Project from Maven Archetype

First, create a new Maven project using the Kotlin archetype org.jetbrains.kotlin:kotlin-archetype-jvm with version 2.0.0. This archetype provides a basic project structure for Kotlin applications.

Step 2: Replace JUnit 4 with JUnit 5

The default setup includes JUnit 4, which we need to replace with JUnit 5 to take advantage of the latest testing features. Let’s update your pom.xml to include the JUnit 5 dependency.

Step 3: Add the avro4k Dependency

Next, add the avro4k dependency to your pom.xml.

<dependency>
<groupId>com.github.avro-kotlin.avro4k</groupId>
<artifactId>avro4k-core</artifactId>
<version>${avro4k-core.version}</version>
</dependency>

The variable avro4k-core.version points to version 1.10.1.

This library contains all the logic that we need to generate Avro schemas directly from Kotlin data classes.

Step 4: Configure the Kotlin Maven Plugin

Now, configure the Kotlin Maven Plugin to include the kotlinx.serialisation compiler plugin. Add the following configuration to your pom.xml:

<build>
    <sourceDirectory>src/main/kotlin</sourceDirectory>
    <testSourceDirectory>src/test/kotlin</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-maven-plugin</artifactId>
            <version>${kotlin-dependencies.version}</version>
            <executions>
                <execution>
                    <id>compile</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>test-compile</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <languageVersion>${kotlin.version}</languageVersion>
                <jvmTarget>${java.version}</jvmTarget>
                <compilerPlugins>
                    <plugin>kotlinx-serialization</plugin>
                </compilerPlugins>
            </configuration>
            <dependencies>
                <dependency>
                    <groupId>org.jetbrains.kotlin</groupId>
                    <artifactId>kotlin-maven-serialization</artifactId>
                    <version>${kotlin-dependencies.version}</version>
                </dependency>
            </dependencies>
        </plugin>
    </plugins>
</build>

Two parts of this configuration are crucial for setting up Maven to work properly with Kotlinx serialisation: compilerPlugins and their dependencies.

First, you have to add the kotlinx-serialization plugin to the compilerPlugins section. It specifies additional compiler plugins to be used during the compilation process that will enhance or modify the compilation. However, it is just a declaration. It will not automatically download necessary dependencies.

In order to configure it as well, you have to add the following definition to the dependencies section of the plugin configuration.

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-maven-serialization</artifactId>
    <version>${kotlin-dependencies.version}</version>
</dependency>

This setup ensures that the Kotlin serialisation plugin is correctly applied during the build process.

Step 5: Re-import the Project in IntelliJ IDEA

If you are using IntelliJ IDEA as your development environment, re-import the Maven project to apply the new configurations. Occasionally, IntelliJ IDEA’s compiler might not recognize symbols from the Kotlin serialisation library. If this happens, invalidate caches and restart the IDE.

Implementing a Simple Model for Avro Serialisation

To demonstrate the use of Kotlinx Serialization and Avro4k, let’s implement a simple model for a book. For simplicity, we will keep all the related classes in one file since the model is not very big. Here is how you can define the necessary classes:

enum class CurrencyType {
    USD, EUR, NOK // Can be extended if needed.
}

data class Price(
    @Serializable(with = BigDecimalSerializer::class) var amount: BigDecimal,
    var currency: CurrencyType
)

data class Author(
    val firstName: String,
    val lastName: String
)

data class Book(
    val title: String,
    val author: Author,
    @Serializable(with = YearSerializer::class) val publicationYear: Year,
    val numberOfPages: Int,
    val price: Price,
    @AvroFixed(13) val isbn: String
)

In this model:

  • CurrencyType is an enum class representing the currency type for the book price. It includes USD, EUR, and NOK but can be extended if needed.
  • Price is a data class that holds the amount and currency type of the book’s price.
  • Author is a data class that contains the first and last name of the book’s author.
  • Book is a data class that includes details such as the title, author, publication year, number of pages, price, and ISBN of the book.

Having the model in place, we can add the @Serializable annotation to the signature of every class, like in the following example.

@Serializable
data class Book

Custom Serializers for Unsupported Types in Kotlinx Serialization

When working with Kotlinx serialisation, you might encounter types that do not have default serialisers provided by the library. In our book model, the Year and BigDecimal classes fall into this category. To handle these types, we need to implement custom serialisers. Here is how you can do it.

Every custom serialiser must implement KSerializer interface, providing custom logic for the descriptor field, as well as the deserialize and serialize functions. Let’s do it for the Year type.

class YearSerializer : KSerializer<Year> {

    override val descriptor: SerialDescriptor
        get() = PrimitiveSerialDescriptor("YearSerializer", PrimitiveKind.STRING)

    override fun deserialize(decoder: Decoder): Year {
        return Year.parse(decoder.decodeString())
    }

    override fun serialize(encoder: Encoder, value: Year) {
        encoder.encodeString(value.toString())
    }
}

We started with specifying the descriptor: SerialDescriptor value to describe the serialised form as a string. It is needed by the deserialiser to properly assign Avro type to a given value. In the deserialize method, it converts a string back into a Year object using Year.parse. Conversely, the serialize method transforms a Year object into its string representation. This custom serialiser ensures that Year values are properly converted to and from their string forms during serialisation and deserialisation processes.

Similarly, we can implement the serialiser for the BigDecimal type.

class BigDecimalSerializer : KSerializer<BigDecimal> {
    override val descriptor: SerialDescriptor
        get() = PrimitiveSerialDescriptor("BigDecimal", PrimitiveKind.STRING)

    override fun deserialize(decoder: Decoder): BigDecimal {
        return BigDecimal(decoder.decodeString())
    }

    override fun serialize(encoder: Encoder, value: BigDecimal) {
        encoder.encodeString(value.toString())
    }
}

Updating the Model Classes

With the custom serialisers implemented, the next step is to update the respective fields in our model classes to use these serialisers. In order to do so, you have to add two annotations to members of the Book and the Price classes.

Let’s start with the Book class.

@Serializable
data class Book(
    val title: String,
    val author: Author,
    @Serializable(with = YearSerializer::class) val publicationYear: Year,
    val numberOfPages: Int,
    val price: Price,
    val isbn: String
)

And then move to the Price class.

@Serializable
data class Price(
    @Serializable(with = BigDecimalSerializer::class) var amount: BigDecimal,
    var currency: CurrencyType
) 

The @Serializable annotation can be used for fields too, and allows to specify a custom serialiser for a given one.

Writing a Test to Generate and Validate the Avro Schema

With our model and custom serializers set up, we are ready to write a test to generate the Avro schema from the Book class and validate it against the expected schema. This process ensures that the schema generated by the avro4k library matches our predefined expectations.

Step 1: Define the Expected Schema

First, we define the expected Avro schema as a JSON string. This schema should reflect the structure of our Book data class, including nested fields and custom serializers.

val expectedSchema = """
            {
              "type" : "record",
              "name" : "Book",
              "namespace" : "com.lukaszpalt",
              "fields" : [ {
                "name" : "title",
                "type" : "string"
              }, {
                "name" : "author",
                "type" : {
                  "type" : "record",
                  "name" : "Author",
                  "fields" : [ {
                    "name" : "firstName",
                    "type" : "string"
                  }, {
                    "name" : "lastName",
                    "type" : "string"
                  } ]
                }
              }, {
                "name" : "publicationYear",
                "type" : "string"
              }, {
                "name" : "numberOfPages",
                "type" : "int"
              }, {
                "name" : "price",
                "type" : {
                  "type" : "record",
                  "name" : "Price",
                  "fields" : [ {
                    "name" : "amount",
                    "type" : "string"
                  }, {
                    "name" : "currency",
                    "type" : {
                      "type" : "enum",
                      "name" : "CurrencyType",
                      "symbols" : [ "USD", "EUR", "NOK" ]
                    }
                  } ]
                }
              }, {
                "name" : "isbn",
                "type" : {
                  "type" : "fixed",
                  "name" : "isbn",
                  "size" : 13
                }
              } ]
            }
        """.trimIndent()

Step 2: Generate the Schema from the Model

Next, we generate the actual Avro schema from the Book class using the avro4k library.

val actualSchema = Avro
    .default
    .schema(Book.serializer())
    .toString(true)

This code uses the Avro.default.schema method to generate the schema and converts it to a pretty-printed JSON string for easier comparison.

Step 3: Assert the Schemas Match

Finally, we assert that the generated schema matches the expected schema.

assertEquals(expectedSchema, actualSchema)

Summary

Generating Avro schemas directly from Kotlin data classes is made straightforward with tools like avro4k and kotlinx.serialization. By setting up a Maven project and configuring the Kotlin Maven Plugin, you can seamlessly serialize Kotlin classes into Avro schemas. This approach simplifies integration, especially when you’re the sole producer for a given topics or you define a model for a given domain.

The avro4k library is quite powerful and allows more than I demonstrated in this tutorial. You may be particularly interested in the following sections of its documentation.

  • Schema definition options.
  • Serialisation and deserialisation options.

You can find the complete working code for this tutorial in my GitHub repository.

Deploy Azure Functions with Maven

Azure Functions is a serverless compute service provided by Microsoft Azure, designed to help developers run event-driven code without the need to manage infrastructure. Whether you’re automating workflows, building APIs, or processing data streams, Azure Functions offers a scalable, cost-effective solution. By abstracting away the complexities of server management, it allows developers to focus on writing the logic that drives their applications.

For Java developers, Azure Functions offers first-class support, making it easier than ever to integrate Java-based solutions into the Azure ecosystem. This support includes a robust set of tools and libraries specifically designed to streamline the development and deployment process for Java applications. If you’re eager to learn more, Microsoft has prepared a decent documentation covering this topic. You’ll find there information about the programming framework, how to implement a Function, and how to deploy it. In this post I’d like to take a look at the latter and present it to you in a slightly different light than usual.

Traditionally, deploying Azure Functions involves tools like Azure CLI, Bicep, or Terraform. These tools are robust and widely used, offering powerful features for managing Azure resources. However, Java developers also have a more seamless and better integrated option to choose from – Maven.

Maven, a build automation tool primarily used for Java projects, can be an excellent choice for deploying Azure Functions. The Azure Functions Maven Plugin allows developers to define their infrastructure as code, simplifying the deployment process and ensuring consistency across environments. It’s a great choice if you’re starting your journey with Azure Functions or have no other infrastructure in the cloud. It’s easy to use and allows you to keep your infrastructure definition as close to your build definition as possible.

The Azure Functions Maven Plugin allows you to use an infrastructure-as-code approach similar to Bicep or Terraform. You can declare the desired infrastructure, and the plugin handles provisioning and state management. This means your Function App won’t be reprovisioned every time you build your application, saving time and resources. While this might seem abstract, seeing it in action will clarify its efficiency and ease of use. Let’s dive into the practical aspects of deploying your Azure Functions with Maven and see how this approach can streamline your development process.

I’m using Maven v3.9.6 in this tutorial. You can find a working example of this configuration here.

First, let’s open the pom.xml file in the azure-funcitons-sdk module. Within this file, you’ll find the declaration of the azure-functions-maven-plugin. At the core of this declaration is the <plugin> element. In Maven, a plugin is a collection of goals, which are specific tasks or actions. In our case, we’re using the azure-functions-maven-plugin to facilitate the deployment of Azure Functions. Let’s take a look at the definition. 

<plugin>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-functions-maven-plugin</artifactId>
    <version>${azure.functions.maven.plugin.version}</version>
    <configuration>
        <appName>azure-functions-sdk</appName>
        <resourceGroup>azure-functions-sdk-rg</resourceGroup>
        <appServicePlanName>azure-functions-sdk-sp</appServicePlanName>
        <pricingTier>Consumption</pricingTier>
        <region>norwayeast</region>
        <runtime>
            <os>linux</os>
            <javaVersion>17</javaVersion>
        </runtime>
        <appSettings>
            <property>
                <name>FUNCTIONS_EXTENSION_VERSION</name>
                <value>~3</value>
            </property>
        </appSettings>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>package</goal>
            </goals>
        </execution>
    </executions>
</plugin>

There’s a lot of things you might be unfamiliar with in this declaration, so let’s crack them one by one.

Plugin Coordinates

(This is pretty standard, but let’s quickly go through them, especially if you’re new to Maven.)

  • <groupId>: Specifies the group identifier for the plugin. In this case, it’s com.microsoft.azure.
  • <artifactId>: Identifies the plugin itself. We’re using the azure-functions-maven-plugin.
  • <version>: Specifies the version of the plugin. The ${azure.functions.maven.plugin.version} variable points to a specific version, such as 1.24.0 in our example.

Configuration Section

  • <appName>: The name of your Azure Functions application, set to azure-functions-sdk.
  • <resourceGroup>: Defines the Azure resource group where your functions will be deployed, here named azure-functions-sdk-rg.
  • <appServicePlanName>: The Azure App Service Plan where your functions will run, set to azure-functions-sdk-sp.
  • <pricingTier>: Specifies the pricing tier for your functions. It’s set to Consumption, since it’s the cheapest and most popular option. You can also choose Premium or Dedicated tier here.
  • <region>: Determines the Azure region for deployment, set to norwayeast in our case. Note that free trial subscriptions may have limitations on available regions. Check Azure’s documentation for your specific options.
  • <runtime>: Configures the runtime settings:
    • <os>: Specifies the operating system, which is linux.
    • <javaVersion>: Sets the Java version used for the functions runtime, set to 17.
  • <appSettings>: Defines application settings specific to your functions. They’re populated as environment variables to your Function runtime. Here we only specify the Functions version itself, but you can put active profiles or other environment-dependent key-value pairs here.

Executions Section

  • <execution>: Defines when and how the plugin’s goals should be executed. The package goal is responsible for packaging your Azure Functions for deployment. Note that this phase does not deploy anything to Azure; it merely prepares the package. The actual deployment requires invoking a plugin-specific goal, which we will cover later.


Great! Now that we have our configuration in place, the next step is to run the build and deploy our Azure Functions. First, we need to prepare the artefact by executing the clean package command. You can do this from the right-hand side menu in IntelliJ IDEA. This step takes a few seconds to complete on my machine.

Once the packaging is done, the next step is to execute a plugin-specific goal: deploy. It’s essential to use the correct deployment goal to ensure our code gets deployed to Azure. By default, Maven’s deploy goal sends artefacts to the artefact repository specified in the <distributionManagement> section of your POM file, such as Nexus. However, we want to deploy our code directly to Azure. To do this, we need to specify the deployment goal from the Azure Functions Maven Plugin.

Before proceeding, ensure you have Azure CLI installed on your machine. The installation steps vary depending on your operating system, and you can find detailed instructions in the Azure CLI installation guide. I used Homebrew to install it on my machine, but you can choose any way that suits you best.

Once you have Azure CLI installed, open the terminal and type az login. This command initiates a session from your local machine to Azure’s REST API, which is necessary for the plugin to function correctly.

Now, let’s start the deployment process. Issue the following command from the directory where the POM file is located:

mvn azure-functions:deploy

As the deployment progresses, you’ll see logs indicating that all the necessary infrastructure is being provisioned, including the Resource Group, Function App, Application Insights, Storage Account, and App Service Plan. Once the deployment is complete, you can navigate to the Azure Portal to verify that everything is correctly set up and running. This is a great feature itself, because using generic tools like Bicep forces you to define related resources independently. Moreover, all necessary secrets (like connection string to the Storage Account) are added to your App Settings.

And that’s it! By following these steps, you have successfully deployed your Azure Functions using Maven. It may feel a bit odd in the beginning, especially if you’re already experienced with Bicep or Terraform, but I find this approach quite useful in scenarios when I don’t have a lot of infrastructure in Azure, or it’s provided by an external team. In this case an ability to define my Function App in the POM file and use the same tool for build, test, and deployment contributes to a great developer experience. Moreover, automatic creation of all related resources and configuration allows me to get the function up and running in a couple of minutes, making the Maven-based deployment a great option for learning and prototyping.