How to Implement the Pipes and Filters Architecture with Java and Azure

In my previous blog post, I provided an in-depth explanation of the Pipes and Filters architecture, which you can check out here. To recap, the Pipes and Filters architecture breaks down a system into small, self-contained processing components known as filters. Each filter is responsible for performing a specific task or transformation on the data it receives, promoting modularity and reusability. These filters are connected via pipes, which facilitate the flow of data from one filter to the next. This architecture is particularly effective in scenarios involving data integration, processing workflows, transformation pipelines, and stream processing.

In this blog post, I will walk you through a sample implementation of the Pipes and Filters architecture using Java and Azure. Our example project will centre around a chatbot designed to assist in creative thinking activities such as brainstorming.

Sample Project Overview

The goal of this project is to create a tool that integrates with a company’s creative thinking solutions. Specifically, it’s a chatbot that aids teams during brainstorming sessions and other creative activities. The process begins when a user interacts with the application by typing a question, such as “How will the ongoing AI revolution affect the financial industry?” This question is then sent to the application for processing.

How the System Works

  1. Input Validation: The first filter is responsible for validating the user’s question. The question might be in a language that the AI model doesn’t understand, or it might be too long or contain sensitive information. Therefore, the first task is to verify whether the question can be processed further.
  2. Prompt Engineering: If the question is valid, the application uses predefined templates to enrich it. These templates provide context to the AI-powered tool, making the model’s output more valuable. For example, a template might be: “You are a CEO. Given a strategic prompt, you will create X futuristic, hypothetical scenarios that happen Y years from now. The strategic prompt is: Z”. This step is crucial as it leverages prompt engineering to guide the AI model in generating more meaningful responses.
  3. AI Model Interaction: The final step involves sending the enriched prompts to the AI model, which processes them and generates answers. These answers are then displayed back to the user.

Implementation Details

The system consists of three filters:

  1. Input Validation Filter: Validates the user’s input according to the application’s data requirements.
  2. Prompt Engineering Filter: Analyses and enriches the validated input to create a prompt.
  3. AI Model Facade Filter: Sends the engineered prompt to the AI model and handles the response.

The First Filter: Input Validation

The first filter is implemented as an Azure Function, and its primary role is to validate the incoming question.

@FunctionName("QuestionValidationFunction")
public HttpResponseMessage validate(
       @HttpTrigger(name = "question",
               methods = {HttpMethod.POST},
               authLevel = AuthorizationLevel.FUNCTION)
       HttpRequestMessage<String> question,
       @QueueOutput(name = "questionQueue",
               queueName = "question-queue",
               connection = "AzureWebJobsStorage")
       OutputBinding<String> questionQueue,
       ExecutionContext executionContext) {
    // Implementation of validation.
}

The validate method, annotated with @FunctionName("QuestionValidationFunction"), is triggered by an HTTP request. It takes two parameters: the HTTP request containing the question and an output binding to a storage queue named "question-queue". The method validates the question and, if valid, sends it down the pipeline.

The Second Filter: Prompt Engineering

The second filter enriches the validated question with a template to maximise the AI model’s response quality.

@FunctionName("PromptEngineeringFunction")
public void sendPrompt(
       @QueueTrigger(
               name = "question",
               queueName = "question-queue",
               connection = "AzureWebJobsStorage")
       String question,
       @QueueOutput(
               name = "promptQueue",
               queueName = "prompt-queue",
               connection = "AzureWebJobsStorage")
       OutputBinding<String> promptQueue,
       ExecutionContext executionContext) {
   // Prompt engineering logic.
}

This function is triggered by messages in the "question-queue". When a new message arrives, the function is invoked, and the question is enriched before being sent to the next queue, "prompt-queue".

The Third Filter: AI Model Facade

The third filter handles communication with the AI model. This filter is implemented using the Spring Cloud Function framework, which decouples infrastructure configuration from the business logic. I’ll describe it in detail in the next blog post, but I’ll give you a short description here so you understand the code.

The functions are implemented as Java function interfaces and autowired into respective request handlers. The handlers contain logic that configures integration with the serverless platform provider. In our case it’ll be the Azure SDK (which examples we’ve seen before). With this setup, you can change the cloud provider by simply rewriting the handlers (and changing build definition) without any need to rewrite the functions itself. 

Let’s now look at the function’s code. 

@Bean
public Function<String, String> answer(ModelClient modelClient) {
   	// Function’s logic
}

The answer function is a simple Java function interface that handles the logic for interacting with the AI model. It is autowired into a handler that manages the integration with Azure.

@Component
public class AnswerHandler {

   private final Function<String, String> answer;

   public AnswerHandler(Function<String, String> answer) {
       this.answer = answer;
   }

   @FunctionName("answer")
   public void answer(
           @QueueTrigger(
                   name = "promptQueue",
                   queueName = "prompt-queue",
                   connection = "AzureWebJobsStorage")
           String prompt,
           @QueueOutput(
                   name = "answerQueue",
                   queueName = "answer-queue",
                   connection = "AzureWebJobsStorage")
           OutputBinding<String> answerQueue
   ) {
       // Handler’s logic
   }
}

This handler is similar to the previous filters, but it delegates the business logic to the answer function. The answerQueue is used to send the final answer for further consumption. 

Deployment

With all three filters implemented, you can now deploy the application to Azure, to play with the code. The deployment process can be accomplished using Maven, as described in this article

In summary, we implemented a complete Pipes and Filters architecture using both the Azure SDK and Spring Cloud Function. The system comprises three filters – each responsible for a distinct part of the application’s workflow: input validation, prompt engineering, and AI model communication. The unidirectional data flow is managed primarily by queues, ensuring a clean separation of concerns and easy scalability.

Summary

This blog post demonstrates how to implement the Pipes and Filters architecture using Java and Azure for a chatbot that assists in creative thinking activities. The architecture is broken down into three filters: input validation, prompt engineering, and AI model interaction. Each filter handles a specific task in the data processing pipeline, ensuring modularity and reusability. The post also covers the deployment process using Azure and Spring Cloud Function, highlighting the benefits of separating business logic from infrastructure configuration.

If you’re interested in how this architecture style can be used to implement serverless solutions, and how to work with Azure Functions in Java, check out my Udemy course that covers these topics in detail.  

The working code example can be found on my GitHub