Amazon Managed Blockchain + Web3.py Provider

Jan 04, 2021

I started using Amazon Managed Blockchain recently to run some ethereum nodes, and was surprised that I couldn’t find a good Web3.py provider that plugs into Amazon Managed Blockchain.

I went ahead and wrote one - it’s a fairly simple thing to get right; the trickiest part is getting the AWS signatures to work. fortunately, there’s a library that helps with that.

The library depends on requests-auth-aws-sigv4 and requests, so pip install those first.

HTTP Provider

import logging
from typing import Any

import requests
from eth_typing import URI
from web3._utils.request import _get_session
from web3.providers.rpc import HTTPProvider
from web3.types import Middleware, RPCEndpoint, RPCResponse
from requests_auth_aws_sigv4 import AWSSigV4

aws_auth = AWSSigV4(
    'managedblockchain',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region=AWS_REGION # us-east-1
)

def make_post_request(
        endpoint_uri: URI, data: bytes, *args: Any, **kwargs: Any) -> bytes:
    kwargs.setdefault('timeout', 10)
    session = _get_session(endpoint_uri)
    # https://github.com/python/mypy/issues/2582
    response = session.post(endpoint_uri, data=data,
                            *args, **kwargs, auth=aws_auth)  # type: ignore
    response.raise_for_status()

    return response.content

class AMBHTTPProvider(HTTPProvider):
    def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
        self.logger.debug("Making request HTTP. URI: %s, Method: %s",
                          self.endpoint_uri, method)

        # .decode() since the AWS sig library expects a string.
        request_data = self.encode_rpc_request(method, params).decode()
        raw_response = make_post_request(
            self.endpoint_uri,
            request_data,
            **self.get_request_kwargs()
        )
        response = self.decode_rpc_response(raw_response)
        self.logger.debug("Getting response HTTP. URI: %s, "
                          "Method: %s, Response: %s",
                          self.endpoint_uri, method, response)
        return response

Let me know if this is helpful and I can turn it into a python package. I’ll also implement a WSS version next.

Also, if you’re debugging, be sure to double-check any errors using awscurl.

$ awscurl \
  --access_key=... \
  --secret_key=.. \
  -vv \
  --service=managedblockchain \
  https://nd-your-blockchain-node.ethereum.managedblockchain.us-east-1.amazonaws.com/

Update: I think it should be possible to do this by passing the authenticator into the HTTP provider kwargs. Specifically, see this line in the HTTPProvider implementation.

Problem is, you’ll still run into the bytes encoding/decoding issue. So perhaps the solution is to make a modification to the signer library, instead of Web3.py.

Websocket Provider

Now with HTTP working, let’s get the WebsocketProvider working as well. This one is a little bit easier to implement since we know the hooks and patterns from having done the HTTP Provider.

We’ll need to do two things: implement a custom websocket handshake (to account for AWS’s authentication), and fix binary encoding in the websocket protocol, since the API gateway doesn’t yet support binary transport frames.

Implement Handshake

We need to add some code to the websocket handshake() coroutine so that this HTTP request gets decorated with the amazon signing headers. In the current release of Web3, I couldn’t find a way to hook into the handshake() method, so we subclass WebSocketClientProtocol.

Here are the important sections we add to the handshake method. I purposely excluded the rest of the method and have the full implementation below.

class AMBWebSocketClientProtocol(websockets.client.WebSocketClientProtocol):
    async def handshake(...)
        (...)
        req = Request('GET', f'https://{wsuri.host}/',
                      data='', headers=request_headers)
        prepared_request = req.prepare()
        encrypted_request = aws_auth(prepared_request)
        self.write_http_request(wsuri.resource_name,
                                Headers(encrypted_request.headers))
        status_code, response_headers = await self.read_http_response()
        (...)

We’re using the requests library (which Web3.py uses) to take the HTTP request getting assembled in the websocket, decorate it with the AWS headers, and then submitting it.

Again, not the prettiest solution but it works.

Subclass the Provider

If you tried running this the way it is, you would be able to make a request, but the API Gateway that your node sits behind will reject it and you’ll get error 1003Some diligent googling highlights the issue: the API gateway doesn’t support binary transport frames.

This part was actually really fun to write. I first thought I had to add a lot more code, but it’s just a few additions. Essentially, before you make a request, you have to convert bytes to str, and when parsing a response, str to bytes. That’s where the subclassed websocket provider comes into play:

class AMBWebsocketProvider(web3.providers.WebsocketProvider):
    def encode_rpc_request(self, method: RPCEndpoint, params: Any):
        as_bytes = super().encode_rpc_request(method, params)
        return as_bytes.decode()

    # typing: ignore
    def decode_rpc_response(self, raw_response: str):  # typing: ignore
        as_bytes = raw_response.encode()
        return super().decode_rpc_response(as_bytes)

Now:

>>> await async_w3.isConnected()
True
>>> await async_w3.eth.block_number()
11591745

Success.

Full code below:

from .websocket import AMBWebsocketProvider, handle_create_protocol

async_provider = AMBWebsocketProvider(
    endpoint_uri=config.env('AMB_WS_ENDPOINT'),
    websocket_kwargs=dict(
        create_protocol=handle_create_protocol
    )
)
async_w3 = Web3(async_provider)
import collections.abc
import websockets
from websockets.handshake import build_request
from websockets.http import Headers
from .signer import aws_auth
from typing import (Optional, Sequence, Any)
from requests.models import PreparedRequest
from requests import Request
from web3.providers import WebsocketProvider
from base64 import b64encode, b64decode
import asyncio
from web3.types import (
    RPCEndpoint,
    RPCResponse,
)

class AMBWebsocketProvider(WebsocketProvider):
    def encode_rpc_request(self, method: RPCEndpoint, params: Any):
        as_bytes = super().encode_rpc_request(method, params)
        return as_bytes.decode()

    # typing: ignore
    def decode_rpc_response(self, raw_response: str):  # typing: ignore
        as_bytes = raw_response.encode()
        return super().decode_rpc_response(as_bytes)

class AMBWebSocketClientProtocol(websockets.client.WebSocketClientProtocol):
    async def handshake(self,
                        wsuri: websockets.WebSocketURI,
                        origin: Optional[websockets.typing.Origin] = None,
                        available_extensions: Optional[Sequence[websockets.extensions.base.ClientExtensionFactory]] = None,
                        available_subprotocols: Optional[Sequence[websockets.typing.Subprotocol]] = None,
                        extra_headers: Optional[websockets.http.HeadersLike] = None):
        """
        Perform the client side of the opening handshake.

        :param origin: sets the Origin HTTP header
        :param available_extensions: list of supported extensions in the order
            in which they should be used
        :param available_subprotocols: list of supported subprotocols in order
            of decreasing preference
        :param extra_headers: sets additional HTTP request headers; it must be
            a :class:`~websockets.http.Headers` instance, a
            :class:`~collections.abc.Mapping`, or an iterable of ``(name,
            value)`` pairs
        :raises ~websockets.exceptions.InvalidHandshake: if the handshake
            fails

        """
        request_headers = Headers()

        if wsuri.port == (443 if wsuri.secure else 80):  # pragma: no cover
            request_headers["Host"] = wsuri.host
        else:
            request_headers["Host"] = f"{wsuri.host}:{wsuri.port}"

        # if wsuri.user_info:
        #     request_headers["Authorization"] = build_authorization_basic(
        #         *wsuri.user_info
        #     )

        if origin is not None:
            request_headers["Origin"] = origin

        key = build_request(request_headers)

        if available_extensions is not None:
            extensions_header = websockets.headers.build_extension(
                [(extension_factory.name,
                  extension_factory.get_request_params())
                 for extension_factory in available_extensions])
            request_headers["Sec-WebSocket-Extensions"] = extensions_header

        if available_subprotocols is not None:
            protocol_header = websockets.headers.build_subprotocol(
                available_subprotocols)
            request_headers["Sec-WebSocket-Protocol"] = protocol_header

        if extra_headers is not None:
            if isinstance(extra_headers, Headers):
                extra_headers = extra_headers.raw_items()
            elif isinstance(extra_headers, collections.abc.Mapping):
                extra_headers = extra_headers.items()
            for name, value in extra_headers:
                request_headers[name] = value

        request_headers.setdefault("User-Agent", websockets.http.USER_AGENT)

        req = Request('GET', f'https://{wsuri.host}/',
                      data='', headers=request_headers)
        prepared_request = req.prepare()
        encrypted_request = aws_auth(prepared_request)
        self.write_http_request(wsuri.resource_name,
                                Headers(encrypted_request.headers))
        status_code, response_headers = await self.read_http_response()
        if status_code in (301, 302, 303, 307, 308):
            if "Location" not in response_headers:
                raise websockets.exceptions.InvalidHeader("Location")
            raise websockets.exceptions.RedirectHandshake(
                response_headers["Location"])
        elif status_code != 101:
            raise websockets.exceptions.InvalidStatusCode(status_code)

        websockets.handshake.check_response(response_headers, key)

        self.extensions = self.process_extensions(
            response_headers, available_extensions
        )

        self.subprotocol = self.process_subprotocol(
            response_headers, available_subprotocols
        )

        self.connection_open()

def handle_create_protocol(*args, **kwargs):
    return AMBWebSocketClientProtocol(*args, **kwargs)

Feel free to message me if you have any questions: @omarish.