After reading a whole 10 chapters of the 📕 Rust Book, I decided that the best way to continue learning would be to try building something with it. The opportunity came when my team and I needed a throwaway service with functionality that would be eventually handled by a third party, in order to provide a manual way for a user to enter that data into the system now. The service would be small enough that if everything goes wrong I can quickly rewrite it using .NET, and as it will definitely be replaced by new functionality in an existing system we can relax some of the more complicated-to-implement constraints around data/error handling.
This service would need several capabilities:
- provide http endpoints - Actix seems to be a solid, mature option;
- save data to a Postgres database - the common choice for this seems to be Diesel, but I have an irrational hatred of ORMs so instead using tokio-postgres with deadpool for connection pooling;
- send messages over Kafka - and rust-rdkafka appears to be an excellent choice.
This service will be responsible for storing data after receiving a http request and then raising an event that will be processed by another service. Later it will need to process and respond to follow up events about the data, but that is not covered in this article.
With an understanding of the requirements, and an idea of the technology setup we're ready to bring Sample Service to life 👶
HTTP Endpoint With Actix
Hello World 👋
To start with, we can make a service that runs and responds with a static message on a single HTTP endpoint.
After creating the project with
cargo new we need to add a depencency on Actix to our
Then we need to replace the code in
main.rs with a function to handle our
sample endpoint, and appropriate config to run the Actix
hello_world() function returns a
http response with a simple text message in the body.
main() function runs a new http server on port 8000,
and binds the
hello_world() function to the path
We can quickly test this by running the server using
cargo run and
making a request locally:
POSTing Data 📬
Just returning static data isn't very useful - we need to be able to receive data from a client and then do something with it. To do that, we'll make a POST endpoint that can deserialize JSON into a type that we'll define.
The data we receive should be identifiable, so it will have an
id property that we will use a
Uuid for, as well as some other data that we can store in
In order to use
Uuid we need to add a dependency on the uuid crate. We also need to make this type (de)serializable and Serde seems to be the standard choice for that in Rust. Both should be added to the
cargo.toml file as dependencies alongside
Cargo dependencies have a neat option to have different features toggled on or off when they're included in the
cargo.toml file - here we make use of it to toggle on the serde bindings that are provided by the Uuid crate 😍
With the dependencies in place, we can automatically derive the traits needed for serialization of our type:
serde attribute handles translating property names between
snake_case in our rust code and
camelCase which is more common in JSON.
With the data type in place, we can add an endpoint to the application that will receive this data as JSON. Actix uses Extractors to handle deserialization automatically using the serde traits that we have already derived for our type. If deserialization fails, it will return a
400 Bad Request, otherwise it will pass the deserialized value to our handler function. That makes it very straightforward for us to add an endpoint that receives the data and returns the id for it as a small check that everything is working as we expect.
The handler function is defined:
And then added to the
HttpServer in the
This is also quick to test using
cargo run and making a request against the service:
When the service is deployed and being used, it would be useful to have it log some information so we could monitor if everything was running as expected. We can start very simply by printing messages when the service starts and stops so we get some output to tell us that it's running:
Note how we now assign the result of running the
HttpServer to a value so that we can print a message just before the service terminates.
This gives us an idea that the service started (or stopped), but it doesn't provide any information about what happens while the service is running - fortunately we can provide Actix with a
Logger and it will log information about each request. Actix uses Middleware to implement logging, and we can pass it any log implementation that is compatible with the log crate facade. There are sensible logging options to choose from, but for this service we'll use the much more interesting emoji-logger 😀
To set up the logger we first add a dependency in
Reference the crate with
extern crate emoji_logger; at the top of
main.rs and then initialise it in the
main() function using
emoji_logger::init();. This will create the logger, but we still need to configure Actix to use it by passing one or more Loggers with the format we want. We can also provide an environment variable to configure the level of log messages that are output - for now we can set that in code, but later we should read that from the environment so that it's easy to configure between different environments (for example, we might want to output a lot of data when running locally, but find that would be too spammy when the service is running in production).
With our logger initialised and configured with Actix we now have the following code:
We can double check this by running the service with
cargo run, making some sample HTTP requests against it, and then checking the console output - it should show something like this:
Actix supports a bunch of different format options to configure what should be included in log messages if we wanted to change the output, but the default is good enough for now.
Saving Data In Postgres
Now that our service can receive data, we need to do something with it - and we can start by saving it to a database.
There are many ways to setup a database server that won't be covered here, but as a prerequisite for our code we will need a table to work against:
For providing connection details to our service it's best if we can use an external configuration source so that we can change the values without having to recompile and deploy. We can use environment variables for this purpose, and use the rust-dotenv crate to keep configuration values for local development in a config file.
To do this we add
dotenv to our dependencies:
.env file in the same directory as
cargo.toml containing the configuration values we want to use:
dotenv in our code and load the values in our
With the values defined and loaded from the environment, they can be moved into a
Config object that will let us create a connection pool for the database that can be passed to Actix to provide to handlers when needed. There are several dependencies needed to communicate with the database:
deadpool-postgres it's important (currently) to use this version because
deadpool-postgres reference different, incompatible versions of tokio in their latest versions.
This also enables support in
tokio-postgres for using
Uuid values, and using serde to handle serialization of JSON values by toggling those features on.
DbConfig type can give us a straightforward wrapper around the
deadpool_postgres::Config type and allow us to encapsulate loading configuration values into it:
We can then use this in a factory function to create a connection pool that we can pass to Actix so that it in turn can pass it on to handlers that need to access the database.
This function uses
unwrap() instead of returning a
Result with any potential error because it will be run only once, on startup, and any errors would be configuration errors that should be corrected before running the service. Additionally,
deadpool won't try to establish a connection until one is needed, which should rule out transient database connectivity errors preventing the service from starting - so panicking is (imo) appropriate 😅
data() method when setting up the
HttpServer will allow Actix to give handlers access to the connection pool:
Saving Data 💾
Now that we have a way to access the database, we should save any data that we receive to it.
We use a
deadpool_postgres::Client to communicate with the database through the connection pool and will need some
SampleData to insert into it:
Data<> extractor from Actix allows us to get access to application state, which in this case is the connection pool we created earlier. The
Pool in turn allows us to get a
Client to communicate with the database:
We don't need to adjust the server configuration to accommodate any of the changes we've made to the handler here, and this can be tested by running the service with
cargo run and then querying the database after making a successful POST request.
Handling Errors ❌
receive_data function works properly for the happy path of events, but if there are any problems connecting to the database or inserting a new record, it will panic. Actix will catch the panic for us and return a
500 Internal Server Error to the client, but that doesn't provide a very good experience and panicking for every error like that is very bad practice.
To improve things, we can create a custom error type that implements the
ResponseError trait and have the handler return a
Result then Actix will be able to return a better response to the client.
The error type will also need to implement some other traits, but by taking a dependency on the derive_more crate, these can be generated.
With that in place we can define an error type that fits the few errors we want to handle:
The error types are kept as cases in an
enum and we match over them to provide a different
HttpResponse that Actix will return to clients if our handler returns a failing result. The responses implemented here are very simple, but if needed, we could also add more data to the response to help clients understand the problem.
In the handler, the calls to
unwrap() can now be replaced with
map_err calls to convert the error received from database code into the new
ResponseError type then we can use the
? operator to return the failure immediately:
Changing the return type to
Result also requires wrapping the return value in
Ok() so that the types line up properly.
The main difference from the outside is that now submitting two requests with the same id will return a
400 Bad Request instead of panicking and returning
500 Internal Server Error.
Sending Messages Over Kafka
The final requirement for the service is that it can raise events about new sample data over Kafka, which will be done using a
FutureProducer from rust-rdkafka.
When connecting to a local Kafka instance we typically wouldn't need any client authentication, however when connecting to a production Kafka cluster we probably need to use more secure methods; in this case we need to use SASL to connect. As long as we include the necessary dependencies for both approaches, we can vary the configuration options that we pass in to ensure that we use the proper method in each environment.
cargo.toml we add these dependencies:
rdkafka dependency should have
sasl features toggled on, and the
sasl2-sys dependency is included using
* as the version so that it will just take whichever version
rdkafka uses, toggling on the vendored feature ensuring that
libsasl2 will be bundled with the service rather than relying on the underlying system to already have this library.
We want to load our configuration from the environment, but the properties included in the configuration might also vary between environments. For example, locally we might only need to set
bootstrap.servers but in production we would rely on setting
security.protocol and the properties pertaining to our chosen authentication method. A simple way of ensuring that we load all the configuration values for each environment without having to model or handle missing values is to add a prefix to their names in the environment and iterate over them adding them directly to the
rdkafka::ClientConfig. With that approach we can define a function to create a producer:
This function can also set any config values that we don't want to vary across environments (this sets
message.timeout.ms directly), and will panic if it can't create a producer because we expect that to be caused by a configuration error that we would want to correct before running the service.
With the appropriate config value in our .env file:
The main function can be modified to create a
FutureProducer on startup that can be used by our handlers:
This uses the same data method as for the database connection pool to include the
FutureProducer as part of application state in Actix.
Sending Messages 📨
With the connection in place, we need to actually produce messages to send over Kafka. We'll send messages as JSON, and will wrap them in a standard envelope to make handling a variety of messages easier for a consumer.
serde_json has a handy
json! macro that can take a JSON literal and return a
Value, and we can use this to wrap the data to be sent in our envelope format. The envelope format contains a message type that will always be the same as we only send one type of message, and a
messageId field that we will set using a new
Uuid To do this requires adding a dependency on
serde_json and adding an extra feature to the
With those in place we can define a function that can take a
FutureProducer and some
SampleData then wrap the data in the envelope format and send it to Kafka with the producer:
This creates a
FutureRecord to pass the message to the
sample-received topic on Kafka, and provides a
Duration to the
producer.send method to specify how long it should wait if the queue is full when trying to send the message - setting this to
0 will mean that it will fail immediately if the message cannot be enqueued.
In the case that this message does fail to send, the
unwrap() call will cause it to panic even though the Kafka library returns a result. We can add a new error case to the
SampleError type so that we can handle the error properly and return a message to the client to explain what failed:
ℹ It's worth noting that the
KafkaError returned from
rdkafka provides a lot more detail about what went wrong when sending to Kafka, which could be useful to refine this message in the future - but we won't go into that here.
With the error type updated, the send function can be updated to return a
receive_data handler can be updated to take a
FutureProducer from application state, and send a sample message to Kafka after saving it to the database:
With all the pieces in place we can test manually by running with
cargo run, making a http request to the endpoint and then consuming the Kafka topic directly (I'll use Kaf for this):
This shows that the message went to our topic in the format we expected 🎉
With all this in place we should now have a service that can receive data over http and then send it to both a Postgres database and a Kafka topic. There are a bunch of potential improvements that could be made - for example, we might end up in a bad state where we write to the database successfully, but never send the value over Kafka, which might cause issues in downstream systems - but this should be needs-meeting for most cases. 🙂
Hopefully this is a useful guide for putting all these pieces together, and as I said at the start I'm not a Rust expert and this is mostly a walkthrough of what I ended up doing after finding various samples and snippets for the individual pieces.
If you have any thoughts/suggestions/feelings/reckonings/anecdotes or otherwise, feel free to reach out on twitter 🙌