Skip to content

Diameter Protocol Examples

This document provides practical examples of implementing Diameter protocol interfaces in various scenarios.

Basic Client Implementation

Client Setup

from diameter.client import DiameterClient
from diameter.avp import AVP
import asyncio

class BasicDiameterClient(DiameterClient):
    def __init__(self, host, port, realm):
        super().__init__(host, port)
        self.realm = realm

    async def connect(self):
        await self.establish_connection()
        await self.capability_exchange()

    async def capability_exchange(self):
        request = self.create_cer()
        response = await self.send_request(request)
        return response.result_code == 2001  # DIAMETER_SUCCESS

    def create_cer(self):
        avps = [
            AVP('Origin-Host', self.host),
            AVP('Origin-Realm', self.realm),
            AVP('Host-IP-Address', self.host_ip),
            AVP('Vendor-Id', 0),
            AVP('Product-Name', 'Example Diameter Client'),
            AVP('Supported-Vendor-Id', 10415),  # 3GPP
            AVP('Auth-Application-Id', 4)  # Diameter Credit Control
        ]
        return self.create_request('Capabilities-Exchange-Request', avps)

Usage Example

async def main():
    client = BasicDiameterClient(
        host='client.example.com',
        port=3868,
        realm='example.com'
    )

    try:
        await client.connect()
        print("Connected to Diameter server")

        # Keep connection alive
        while True:
            await client.send_dwr()
            await asyncio.sleep(30)

    except Exception as e:
        print(f"Error: {str(e)}")
    finally:
        await client.disconnect()

if __name__ == '__main__':
    asyncio.run(main())

Authentication Examples

Authentication Request

async def send_auth_request(client, user_name, service_type):
    avps = [
        AVP('Session-Id', f"session.{uuid.uuid4()}"),
        AVP('Origin-Host', client.host),
        AVP('Origin-Realm', client.realm),
        AVP('Destination-Realm', 'auth.example.com'),
        AVP('Auth-Application-Id', 1),  # NASREQ
        AVP('User-Name', user_name),
        AVP('Service-Type', service_type)
    ]

    request = client.create_request('AA-Request', avps)
    return await client.send_request(request)

# Usage
async def authenticate_user(client, username):
    try:
        response = await send_auth_request(
            client=client,
            user_name=username,
            service_type=2  # FRAMED
        )

        if response.result_code == 2001:
            print(f"User {username} authenticated successfully")
            return True
        else:
            print(f"Authentication failed: {response.error_message}")
            return False

    except Exception as e:
        print(f"Authentication error: {str(e)}")
        return False

Charging Examples

Real-time Charging Session

class ChargingSession:
    def __init__(self, client, subscription_id):
        self.client = client
        self.subscription_id = subscription_id
        self.session_id = f"charging.{uuid.uuid4()}"
        self.request_number = 0

    async def start(self):
        """Initialize charging session"""
        request = self._create_ccr('INITIAL_REQUEST')
        response = await self.client.send_request(request)

        if response.result_code != 2001:
            raise ChargingError("Failed to start charging session")

        return response.get_avp('Granted-Service-Unit')

    async def update(self, used_units):
        """Update charging session with usage"""
        request = self._create_ccr(
            'UPDATE_REQUEST',
            used_service_unit=used_units
        )
        response = await self.client.send_request(request)

        if response.result_code != 2001:
            raise ChargingError("Failed to update charging session")

        return response.get_avp('Granted-Service-Unit')

    async def terminate(self, used_units):
        """End charging session"""
        request = self._create_ccr(
            'TERMINATION_REQUEST',
            used_service_unit=used_units
        )
        await self.client.send_request(request)

    def _create_ccr(self, request_type, used_service_unit=None):
        avps = [
            AVP('Session-Id', self.session_id),
            AVP('Origin-Host', self.client.host),
            AVP('Origin-Realm', self.client.realm),
            AVP('Destination-Realm', 'charging.example.com'),
            AVP('Auth-Application-Id', 4),
            AVP('Service-Context-Id', '[email protected]'),
            AVP('CC-Request-Type', request_type),
            AVP('CC-Request-Number', self.request_number),
            AVP('Subscription-Id', self.subscription_id)
        ]

        if used_service_unit is not None:
            avps.append(AVP('Used-Service-Unit', used_service_unit))

        self.request_number += 1
        return self.client.create_request('Credit-Control-Request', avps)

Usage Monitoring

async def monitor_usage(client, subscription_id, total_quota):
    session = ChargingSession(client, subscription_id)
    used_units = 0

    try:
        # Start session
        granted_units = await session.start()

        # Monitor usage
        while used_units < total_quota:
            # Simulate usage
            current_usage = min(granted_units, 1000)
            used_units += current_usage

            # Update when 80% of granted units used
            if current_usage >= (granted_units * 0.8):
                granted_units = await session.update(current_usage)

            await asyncio.sleep(1)  # Simulate time passing

        # Terminate session
        await session.terminate(used_units)

    except ChargingError as e:
        print(f"Charging error: {str(e)}")
        await session.terminate(used_units)

    except Exception as e:
        print(f"Unexpected error: {str(e)}")
        await session.terminate(used_units)

Load Balancing Example

class LoadBalancedDiameterClient:
    def __init__(self, config):
        self.peers = config['peers']
        self.current_peer = 0
        self.clients = {}

    async def initialize(self):
        """Initialize connections to all peers"""
        for peer in self.peers:
            client = DiameterClient(
                host=peer['host'],
                port=peer['port'],
                realm=peer['realm']
            )
            await client.connect()
            self.clients[peer['host']] = client

    async def send_request(self, request):
        """Send request with round-robin load balancing"""
        retries = len(self.peers)

        while retries > 0:
            peer = self.peers[self.current_peer]
            client = self.clients[peer['host']]

            try:
                response = await client.send_request(request)
                return response

            except ConnectionError:
                # Try next peer
                self.current_peer = (self.current_peer + 1) % len(self.peers)
                retries -= 1

        raise ConnectionError("All peers unavailable")

Vendor-Specific Examples

Nokia Integration

class NokiaChargingClient(DiameterClient):
    def __init__(self, config):
        super().__init__(
            host=config['host'],
            port=config['port']
        )
        self.vendor_id = 28458  # Nokia

    def create_service_request(self, session_id, service_type):
        avps = [
            AVP('Session-Id', session_id),
            AVP('Origin-Host', self.host),
            AVP('Origin-Realm', self.realm),
            AVP('Vendor-Id', self.vendor_id),
            AVP('Nokia-Service-Type', service_type),
            # Add other Nokia-specific AVPs
        ]
        return self.create_request('Service-Request', avps)

Ericsson Integration

class EricssonChargingClient(DiameterClient):
    def __init__(self, config):
        super().__init__(
            host=config['host'],
            port=config['port']
        )
        self.vendor_id = 193  # Ericsson

    def create_quota_request(self, session_id, quota_type):
        avps = [
            AVP('Session-Id', session_id),
            AVP('Origin-Host', self.host),
            AVP('Origin-Realm', self.realm),
            AVP('Vendor-Id', self.vendor_id),
            AVP('Ericsson-Quota-Type', quota_type),
            # Add other Ericsson-specific AVPs
        ]
        return self.create_request('Quota-Request', avps)

Error Handling Examples

Retry Mechanism

class RetryableClient(DiameterClient):
    async def send_request_with_retry(self, request, max_retries=3):
        retries = 0
        backoff = 1

        while retries < max_retries:
            try:
                return await self.send_request(request)

            except ConnectionError as e:
                retries += 1
                if retries == max_retries:
                    raise

                # Exponential backoff
                await asyncio.sleep(backoff)
                backoff *= 2

            except DiameterError as e:
                if e.is_permanent():
                    raise

                retries += 1
                if retries == max_retries:
                    raise

                await asyncio.sleep(backoff)
                backoff *= 2

Error Response Handling

class ErrorHandler:
    @staticmethod
    def handle_error(error):
        if isinstance(error, DiameterError):
            if error.result_code == 4001:  # DIAMETER_AUTHENTICATION_REJECTED
                return "Authentication failed"
            elif error.result_code == 4010:  # DIAMETER_END_USER_SERVICE_DENIED
                return "Service denied"
            elif error.result_code == 4012:  # DIAMETER_CREDIT_LIMIT_REACHED
                return "Credit limit reached"
            elif error.result_code == 5001:  # DIAMETER_AVP_UNSUPPORTED
                return "Unsupported AVP"
            elif error.result_code == 5004:  # DIAMETER_INVALID_AVP_VALUE
                return "Invalid AVP value"
            elif error.result_code == 5012:  # DIAMETER_UNABLE_TO_COMPLY
                return "Unable to comply"
            else:
                return f"Diameter error: {error.result_code}"
        else:
            return f"Unexpected error: {str(error)}"

Configuration Examples

Basic Configuration

diameter:
  client:
    host: client.example.com
    port: 3868
    realm: example.com
    vendor_id: 10415
    product_name: Example Client

  connection:
    retry_count: 3
    timeout: 5
    keepalive: 30

Advanced Configuration

diameter:
  client:
    host: client.example.com
    port: 3868
    realm: example.com
    vendor_id: 10415
    product_name: Example Client

  connection:
    retry_count: 3
    timeout: 5
    keepalive: 30
    max_pending_requests: 10000

  security:
    tls_enabled: true
    cert_file: /etc/diameter/certs/client.pem
    key_file: /etc/diameter/certs/client.key
    ca_file: /etc/diameter/certs/ca.pem

  peers:
    - host: server1.example.com
      port: 3868
      realm: example.com
      weight: 100
    - host: server2.example.com
      port: 3868
      realm: example.com
      weight: 100

  monitoring:
    metrics_enabled: true
    prometheus_port: 9090
    log_level: INFO

Testing Examples

Unit Test

import unittest
from unittest.mock import Mock, patch
from diameter.client import DiameterClient

class TestDiameterClient(unittest.TestCase):
    def setUp(self):
        self.client = DiameterClient(
            host='test.example.com',
            port=3868,
            realm='example.com'
        )

    @patch('diameter.client.DiameterClient.send_request')
    async def test_authentication(self, mock_send):
        # Arrange
        mock_response = Mock()
        mock_response.result_code = 2001
        mock_send.return_value = mock_response

        # Act
        result = await self.client.authenticate('test_user')

        # Assert
        self.assertTrue(result)
        mock_send.assert_called_once()

    @patch('diameter.client.DiameterClient.send_request')
    async def test_authentication_failure(self, mock_send):
        # Arrange
        mock_response = Mock()
        mock_response.result_code = 4001
        mock_send.return_value = mock_response

        # Act
        result = await self.client.authenticate('test_user')

        # Assert
        self.assertFalse(result)
        mock_send.assert_called_once()

Integration Test

```python import pytest from diameter.client import DiameterClient

@pytest.mark.integration class TestDiameterIntegration: @pytest.fixture async def client(self): client = DiameterClient( host='test.example.com', port=3868, realm='example.com' ) await client.connect() yield client await client.disconnect()

async def test_capability_exchange(self, client):
    response = await client.capability_exchange()
    assert response.result_code == 2001

async def test_authentication(self, client):
    response = await client.authenticate('test_user')
    assert response.result_code == 2001

async def test_charging(self, client):
    session = ChargingSession(client, 'test_subscription')
    granted_units = await session.start()
    assert granted_units > 0

    await session.update(500)
    await session.terminate(1000)