After reading a whole 10 chapters of the 📕 Rust Book, I decided that the best way to continue learning would be to try building something with it. The opportunity came when my team and I needed a throwaway service with functionality that would be eventually handled by a third party, in order to provide a manual way for a user to enter that data into the system now. The service would be small enough that if everything goes wrong I can quickly rewrite it using .NET, and as it will definitely be replaced by new functionality in an existing system we can relax some of the more complicated-to-implement constraints around data/error handling.
This service would need several capabilities:
- provide http endpoints - Actix seems to be a solid, mature option;
- save data to a Postgres database - the common choice for this seems to be Diesel, but I have an irrational hatred of ORMs so instead using tokio-postgres with deadpool for connection pooling;
- send messages over Kafka - and rust-rdkafka appears to be an excellent choice.
This service will be responsible for storing data after receiving a http request and then raising an event that will be processed by another service. Later it will need to process and respond to follow up events about the data, but that is not covered in this article.
With an understanding of the requirements, and an idea of the technology setup we're ready to bring Sample Service to life 👶
HTTP Endpoint With Actix
Hello World 👋
To start with, we can make a service that runs and responds with a static message on a single HTTP endpoint.
After creating the project with cargo new
we need to add a depencency on Actix to our
cargo.toml
file:
Then we need to replace the code in main.rs
with a function to handle our
sample endpoint, and appropriate config to run the Actix HttpServer
:
The hello_world()
function returns a 200 OK
http response with a simple text message in the body.
The main()
function runs a new http server on port 8000,
and binds the hello_world()
function to the path /hello
.
We can quickly test this by running the server using cargo run
and
making a request locally:
POSTing Data 📬
Just returning static data isn't very useful - we need to be able to receive data from a client and then do something with it. To do that, we'll make a POST endpoint that can deserialize JSON into a type that we'll define.
The data we receive should be identifiable, so it will have an id
property that we will use a Uuid
for, as well as some other data that we can store in String
s:
In order to use Uuid
we need to add a dependency on the uuid crate. We also need to make this type (de)serializable and Serde seems to be the standard choice for that in Rust. Both should be added to the cargo.toml
file as dependencies alongside actix-web
:
Cargo dependencies have a neat option to have different features toggled on or off when they're included in the cargo.toml
file - here we make use of it to toggle on the serde bindings that are provided by the Uuid crate 😍
With the dependencies in place, we can automatically derive the traits needed for serialization of our type:
The extra serde
attribute handles translating property names between snake_case
in our rust code and camelCase
which is more common in JSON.
With the data type in place, we can add an endpoint to the application that will receive this data as JSON. Actix uses Extractors to handle deserialization automatically using the serde traits that we have already derived for our type. If deserialization fails, it will return a 400 Bad Request
, otherwise it will pass the deserialized value to our handler function. That makes it very straightforward for us to add an endpoint that receives the data and returns the id for it as a small check that everything is working as we expect.
The handler function is defined:
And then added to the HttpServer
in the main
function:
This is also quick to test using cargo run
and making a request against the service:
Logging 🪵
When the service is deployed and being used, it would be useful to have it log some information so we could monitor if everything was running as expected. We can start very simply by printing messages when the service starts and stops so we get some output to tell us that it's running:
Note how we now assign the result of running the HttpServer
to a value so that we can print a message just before the service terminates.
This gives us an idea that the service started (or stopped), but it doesn't provide any information about what happens while the service is running - fortunately we can provide Actix with a Logger
and it will log information about each request. Actix uses Middleware to implement logging, and we can pass it any log implementation that is compatible with the log crate facade. There are sensible logging options to choose from, but for this service we'll use the much more interesting emoji-logger 😀
To set up the logger we first add a dependency in cargo.toml
:
Reference the crate with extern crate emoji_logger;
at the top of main.rs
and then initialise it in the main()
function using emoji_logger::init();
. This will create the logger, but we still need to configure Actix to use it by passing one or more Loggers with the format we want. We can also provide an environment variable to configure the level of log messages that are output - for now we can set that in code, but later we should read that from the environment so that it's easy to configure between different environments (for example, we might want to output a lot of data when running locally, but find that would be too spammy when the service is running in production).
With our logger initialised and configured with Actix we now have the following code:
We can double check this by running the service with cargo run
, making some sample HTTP requests against it, and then checking the console output - it should show something like this:
Actix supports a bunch of different format options to configure what should be included in log messages if we wanted to change the output, but the default is good enough for now.
Saving Data In Postgres
Config 🗒
Now that our service can receive data, we need to do something with it - and we can start by saving it to a database.
There are many ways to setup a database server that won't be covered here, but as a prerequisite for our code we will need a table to work against:
For providing connection details to our service it's best if we can use an external configuration source so that we can change the values without having to recompile and deploy. We can use environment variables for this purpose, and use the rust-dotenv crate to keep configuration values for local development in a config file.
To do this we add dotenv
to our dependencies:
Add a .env
file in the same directory as cargo.toml
containing the configuration values we want to use:
Then reference dotenv
in our code and load the values in our main()
function:
With the values defined and loaded from the environment, they can be moved into a Config
object that will let us create a connection pool for the database that can be passed to Actix to provide to handlers when needed. There are several dependencies needed to communicate with the database:
⚠ For deadpool-postgres
it's important (currently) to use this version because actix-web
and deadpool-postgres
reference different, incompatible versions of tokio in their latest versions.
This also enables support in tokio-postgres
for using Uuid
values, and using serde to handle serialization of JSON values by toggling those features on.
A DbConfig
type can give us a straightforward wrapper around the deadpool_postgres::Config
type and allow us to encapsulate loading configuration values into it:
We can then use this in a factory function to create a connection pool that we can pass to Actix so that it in turn can pass it on to handlers that need to access the database.
This function uses unwrap()
instead of returning a Result
with any potential error because it will be run only once, on startup, and any errors would be configuration errors that should be corrected before running the service. Additionally, deadpool
won't try to establish a connection until one is needed, which should rule out transient database connectivity errors preventing the service from starting - so panicking is (imo) appropriate 😅
Using the data()
method when setting up the HttpServer
will allow Actix to give handlers access to the connection pool:
Saving Data 💾
Now that we have a way to access the database, we should save any data that we receive to it.
We use a deadpool_postgres::Client
to communicate with the database through the connection pool and will need some SampleData
to insert into it:
The Data<>
extractor from Actix allows us to get access to application state, which in this case is the connection pool we created earlier. The Pool
in turn allows us to get a Client
to communicate with the database:
We don't need to adjust the server configuration to accommodate any of the changes we've made to the handler here, and this can be tested by running the service with cargo run
and then querying the database after making a successful POST request.
Handling Errors ❌
The receive_data
function works properly for the happy path of events, but if there are any problems connecting to the database or inserting a new record, it will panic. Actix will catch the panic for us and return a 500 Internal Server Error
to the client, but that doesn't provide a very good experience and panicking for every error like that is very bad practice.
To improve things, we can create a custom error type that implements the ResponseError
trait and have the handler return a Result
then Actix will be able to return a better response to the client.
The error type will also need to implement some other traits, but by taking a dependency on the derive_more crate, these can be generated.
With that in place we can define an error type that fits the few errors we want to handle:
The error types are kept as cases in an enum
and we match over them to provide a different HttpResponse
that Actix will return to clients if our handler returns a failing result. The responses implemented here are very simple, but if needed, we could also add more data to the response to help clients understand the problem.
In the handler, the calls to unwrap()
can now be replaced with map_err
calls to convert the error received from database code into the new ResponseError
type then we can use the ?
operator to return the failure immediately:
Changing the return type to Result
also requires wrapping the return value in Ok()
so that the types line up properly.
The main difference from the outside is that now submitting two requests with the same id will return a 400 Bad Request
instead of panicking and returning 500 Internal Server Error
.
Sending Messages Over Kafka
Config 🗒
The final requirement for the service is that it can raise events about new sample data over Kafka, which will be done using a FutureProducer
from rust-rdkafka.
When connecting to a local Kafka instance we typically wouldn't need any client authentication, however when connecting to a production Kafka cluster we probably need to use more secure methods; in this case we need to use SASL to connect. As long as we include the necessary dependencies for both approaches, we can vary the configuration options that we pass in to ensure that we use the proper method in each environment.
In cargo.toml
we add these dependencies:
The rdkafka
dependency should have ssl
and sasl
features toggled on, and the sasl2-sys
dependency is included using *
as the version so that it will just take whichever version rdkafka
uses, toggling on the vendored feature ensuring that libsasl2
will be bundled with the service rather than relying on the underlying system to already have this library.
We want to load our configuration from the environment, but the properties included in the configuration might also vary between environments. For example, locally we might only need to set bootstrap.servers
but in production we would rely on setting security.protocol
and the properties pertaining to our chosen authentication method. A simple way of ensuring that we load all the configuration values for each environment without having to model or handle missing values is to add a prefix to their names in the environment and iterate over them adding them directly to the rdkafka::ClientConfig
. With that approach we can define a function to create a producer:
This function can also set any config values that we don't want to vary across environments (this sets message.timeout.ms
directly), and will panic if it can't create a producer because we expect that to be caused by a configuration error that we would want to correct before running the service.
With the appropriate config value in our .env file:
The main function can be modified to create a FutureProducer
on startup that can be used by our handlers:
This uses the same data method as for the database connection pool to include the FutureProducer
as part of application state in Actix.
Sending Messages 📨
With the connection in place, we need to actually produce messages to send over Kafka. We'll send messages as JSON, and will wrap them in a standard envelope to make handling a variety of messages easier for a consumer.
serde_json
has a handy json!
macro that can take a JSON literal and return a Value
, and we can use this to wrap the data to be sent in our envelope format. The envelope format contains a message type that will always be the same as we only send one type of message, and a messageId
field that we will set using a new Uuid
To do this requires adding a dependency on serde_json
and adding an extra feature to the uuid
dependency:
With those in place we can define a function that can take a FutureProducer
and some SampleData
then wrap the data in the envelope format and send it to Kafka with the producer:
This creates a FutureRecord
to pass the message to the sample-received
topic on Kafka, and provides a Duration
to the producer.send
method to specify how long it should wait if the queue is full when trying to send the message - setting this to 0
will mean that it will fail immediately if the message cannot be enqueued.
In the case that this message does fail to send, the unwrap()
call will cause it to panic even though the Kafka library returns a result. We can add a new error case to the SampleError
type so that we can handle the error properly and return a message to the client to explain what failed:
ℹ It's worth noting that the KafkaError
returned from rdkafka
provides a lot more detail about what went wrong when sending to Kafka, which could be useful to refine this message in the future - but we won't go into that here.
With the error type updated, the send function can be updated to return a Result
:
Then the receive_data
handler can be updated to take a FutureProducer
from application state, and send a sample message to Kafka after saving it to the database:
With all the pieces in place we can test manually by running with cargo run
, making a http request to the endpoint and then consuming the Kafka topic directly (I'll use Kaf for this):
This shows that the message went to our topic in the format we expected 🎉
Wrapping Up
With all this in place we should now have a service that can receive data over http and then send it to both a Postgres database and a Kafka topic. There are a bunch of potential improvements that could be made - for example, we might end up in a bad state where we write to the database successfully, but never send the value over Kafka, which might cause issues in downstream systems - but this should be needs-meeting for most cases. 🙂
Hopefully this is a useful guide for putting all these pieces together, and as I said at the start I'm not a Rust expert and this is mostly a walkthrough of what I ended up doing after finding various samples and snippets for the individual pieces.
If you have any thoughts/suggestions/feelings/reckonings/anecdotes or otherwise, feel free to reach out on twitter 🙌