Эволюция монолитного приложения в микросервисы

Как правило, когда нужно что-то сделать быстро и дёшево, мы не задумываемся над отказоустойчивостью и масштабируемостью нашего приложения, что через некоторое время обязательно приводит к боли. Современные решения позволяют быстро и просто решить эту проблему.
На примере перехода от монолитного приложения к микросервисам, я попробую показать все плюсы и минусы каждого подхода. Статья разделена на три части:



Конечное приложение будет выглядеть, как показано на диаграмме снизу.



Введение


Для того чтобы лучше понять пример, желательно иметь хотя бы базовые знания в Kafka и Docker, приведу несколько на мой взгляд дельных курсов и статей:



Полный код проекта залит на github, можно скачать отсюда.


Часть 1. Монолитное приложение


Для примера монолитного приложения я взял код из официальной документации по Dash (Plotly), посмотреть его можно здесь, в исходном коде проекта находится в папке local_app. Это идеальное решение для быстрого прототипирования.


Исходный код monolith.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

# pip install pyorbital
from pyorbital.orbital import Orbital
satellite = Orbital('TERRA')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = satellite.get_lonlatalt(datetime.datetime.now())
    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    satellite = Orbital('TERRA')
    data = {
        'time': [],
        'Latitude': [],
        'Longitude': [],
        'Altitude': []
    }

    # Collect some data
    for i in range(180):
        time = datetime.datetime.now() - datetime.timedelta(seconds=i*20)
        lon, lat, alt = satellite.get_lonlatalt(
            time
        )
        data['Longitude'].append(lon)
        data['Latitude'].append(lat)
        data['Altitude'].append(alt)
        data['time'].append(time)

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

Приложение каждую секунду обновляет графики и таким образом эмулируется реал-тайм поступление данных. В качестве источника данных используется python пакет pyorbital, с помощью которого можно производить различные астрономические вычисления (в этом примере рассчитывается положение научно-исследовательского спутника Terra (EOS AM-1)). Полученные данные и графики отображаются в браузере через Dash (Plotly) на локальном хосте: 127.0.0.1:8050.


Данные, которые получаются на выходе — это altitude, longitude и latitude (высота, долгота и широта), т.е. координаты спутника в текущий момент времени, первый график показывает изменение высоты, а второй — долготы и широты соответственно.



(На рисунке показана работа монолитного (оригинального) приложения)


К плюсам можно отнести:



К минусам:



Часть 2. Приложение на основе микросервисов


В исходном коде проекта находится в папке local_microservices_app, для удобства туда же я положил сервис Kafka упакованный в Docker, исходный код которого можно посмотреть здесь (взято с github репозитория Stephane Maarek)


Монолитное приложение я разделил на две части, первая часть — backend (producer.py), генерирует данные и отправляет их в Kafka, вторая часть — frontend (consumer.py, graph_display.py) читает сообщения из Kafka и отображает графики в браузере.


backend:
Producer (производитель данных) вычисляет данные и отправляет их в Kafka раз в одну секунду (в оригинальном приложении данные рассчитывались за каждые 20 секунд)


Исходный код producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital

satellite = Orbital('TERRA')

topic = 'test_topic'

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

frontend:
Consumer (потребитель данных) написан в виде класса MyKafkaConnect и находится в файле consumer.py, при инициализации загружает из Kafka последние 180 (или меньше, если их не достаточно) сообщений. При последующих обращениях докачивает все новые сообщения из Kafka.


Изначальное монолитное приложение (monolith.py) не сильно изменилось, ключевое изменение состоит в том, что данные рассчитываются не на месте, а загружаются через класс MyKafkaConnect, все прежние методы работают фактически также.


Исходный код consumer.py
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

Исходный код graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

connect = MyKafkaConnect(topic='test_topic', group='test_group')

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('TERRA Satellite Live Feed'),
        html.Div(id='live-update-text'),
        dcc.Graph(id='live-update-graph'),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ])
)

@app.callback(Output('live-update-text', 'children'),
              [Input('interval-component', 'n_intervals')])
def update_metrics(n):
    lon, lat, alt = connect.get_last()

    print('update metrics')

    style = {'padding': '5px', 'fontSize': '16px'}
    return [
        html.Span('Longitude: {0:.2f}'.format(lon), style=style),
        html.Span('Latitude: {0:.2f}'.format(lat), style=style),
        html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
    ]

# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
              [Input('interval-component', 'n_intervals')])
def update_graph_live(n):
    # Collect some data
    data = connect.get_graph_data()
    print('Update graph, data units:', len(data['time']))

    # Create the graph with subplots
    fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
    fig['layout']['margin'] = {
        'l': 30, 'r': 10, 'b': 30, 't': 10
    }
    fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

    fig.append_trace({
        'x': data['time'],
        'y': data['Altitude'],
        'name': 'Altitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 1, 1)
    fig.append_trace({
        'x': data['Longitude'],
        'y': data['Latitude'],
        'text': data['time'],
        'name': 'Longitude vs Latitude',
        'mode': 'lines+markers',
        'type': 'scatter'
    }, 2, 1)

    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

К плюсам можно отнести:



К минусам:




(На рисунке сверху backend после обновления отправляет вдвое больше сообщений, что видно на графике)


Часть 3. Приложение на основе микросервисов упакованных в Docker


В исходном коде проекта находится в папке docker_microservices_app. От второй части отличается тем, что backend и frontend упакованы в Docker. Также я добавил ещё два микросервиса в backend (ещё два научно-исследовательских спутника Aura (EOS CH-1) и Aqua (EOS PM-1)).


Docker файлы приложения
Исходный код backend/Dockerfile (упакован producer.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install pyorbital

WORKDIR /app

COPY producer.py ./

CMD ["python", "producer.py"]

Исходный код frontend/Dockerfile (упакованы consumer.py и graph_display.py)
FROM python:3.7

RUN python -m pip install confluent-kafka

RUN python -m pip install dash plotly

WORKDIR /app

COPY consumer.py graph_display.py ./

CMD ["python", "graph_display.py"]

Исходный код docker-compose.yml (содержит в себе backend, frontend и Kafka)
version: '2.1'

# Stephane Maarek's kafka-docker
# https://github.com/simplesteph/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    restart: unless-stopped
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka1
    ports:
      - "9092:9092"
    restart: unless-stopped
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  backend_terra:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "terra_topic"
      SATELLITE: "TERRA"
    depends_on:
      - kafka1

  backend_aqua:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aqua_topic"
      SATELLITE: "AQUA"
    depends_on:
      - kafka1

  backend_aura:
    build:
      context: ./backend
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
      TOPIC: "aura_topic"
      SATELLITE: "AURA"
    depends_on:
      - kafka1

  frontend:
    build:
      context: ./frontend
    ports:
      - "8050:8050"
    restart: unless-stopped
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092"
    depends_on:
      - backend_terra
      - backend_aqua
      - backend_aura

Python файлы приложения
Исходный код backend/producer.py
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital
import os

topic = os.environ['TOPIC'] 
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS'] 
s_name = os.environ['SATELLITE']

satellite = Orbital(s_name)

producer = Producer({'bootstrap.servers': bootstrap_servers})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {}".format(err))
    else:
        print("Produced record to topic {} partition [{}] @ offset {}"
              .format(msg.topic(), msg.partition(), msg.offset()))

# send data every one second
while True:
    time = datetime.datetime.now()
    lon, lat, alt = satellite.get_lonlatalt(time)
    record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})

    producer.produce(topic, key=None, value=record_value, on_delivery=acked)

    producer.poll()
    sleep(1)

Исходный код frontend/consumer.py (не изменился)
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep

class MyKafkaConnect:
    def __init__(self, topic, group, que_len=180):
        self.topic = topic

        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': group,
            'enable.auto.commit': True,
        }

        # the application needs a maximum of 180 data units
        self.data = {
            'time': deque(maxlen=que_len),
            'Latitude': deque(maxlen=que_len),
            'Longitude': deque(maxlen=que_len),
            'Altitude': deque(maxlen=que_len)
        }

        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])

        # download first 180 messges
        self.partition = TopicPartition(topic=self.topic, partition=0)
        low_offset, high_offset = consumer.get_watermark_offsets(self.partition)

        # move offset back on 180 messages
        if high_offset > que_len:
            self.partition.offset = high_offset - que_len
        else:
            self.partition.offset = low_offset

        # set the moved offset to consumer
        consumer.assign([self.partition])

        self.__update_que(consumer)

    # https://docs.confluent.io/current/clients/python.html#delivery-guarantees
    def __update_que(self, consumer):
        try:
            while True:
                msg = consumer.poll(timeout=0.1)
                if msg is None:
                    break
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    break
                else:
                    record_value = msg.value()
                    json_data = json.loads(record_value.decode('utf-8'))

                    self.data['Longitude'].append(json_data['lon'])
                    self.data['Latitude'].append(json_data['lat'])
                    self.data['Altitude'].append(json_data['alt'])
                    self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))

                    # save local offset
                    self.partition.offset += 1          
        finally:
            # Close down consumer to commit final offsets.
            # It may take some time, that why I save offset locally
            consumer.close()

    def get_graph_data(self):
        consumer = Consumer(self.conf)
        consumer.subscribe([self.topic])  

        # update low and high offsets (don't work without it)
        consumer.get_watermark_offsets(self.partition)

        # set local offset
        consumer.assign([self.partition])

        self.__update_que(consumer)

        # convert data to compatible format
        o = {key: list(value) for key, value in self.data.items()}
        return o        

    def get_last(self):
        lon = self.data['Longitude'][-1]
        lat = self.data['Latitude'][-1]
        alt = self.data['Altitude'][-1]

        return lon, lat, alt

# for test
if __name__ == '__main__':
    connect = MyKafkaConnect(topic='test_topic', group='test_group')

    while True:        
        test = connect.get_graph_data()

        print('number of messages:', len(test['time']), 
            'unique:', len(set(test['time'])), 
            'time:', test['time'][-1].second)

        sleep(0.1)

Исходный код frontend/graph_display.py
import datetime

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output

from consumer import MyKafkaConnect

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.Div([
            html.H4('TERRA Satellite Live Feed'),
            html.Div(id='terra-text'),
            dcc.Graph(id='terra-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AQUA Satellite Live Feed'),
            html.Div(id='aqua-text'),
            dcc.Graph(id='aqua-graph')
            ], className="four columns"),
        html.Div([
            html.H4('AURA Satellite Live Feed'),
            html.Div(id='aura-text'),
            dcc.Graph(id='aura-graph')
            ], className="four columns"),
        dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row")
)

def create_graphs(topic, live_update_text, live_update_graph):

    connect = MyKafkaConnect(topic=topic, group='test_group')

    @app.callback(Output(live_update_text, 'children'),
                  [Input('interval-component', 'n_intervals')])
    def update_metrics_terra(n):
        lon, lat, alt = connect.get_last()

        print('update metrics')

        style = {'padding': '5px', 'fontSize': '15px'}
        return [
            html.Span('Longitude: {0:.2f}'.format(lon), style=style),
            html.Span('Latitude: {0:.2f}'.format(lat), style=style),
            html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
        ]

    # Multiple components can update everytime interval gets fired.
    @app.callback(Output(live_update_graph, 'figure'),
                  [Input('interval-component', 'n_intervals')])
    def update_graph_live_terra(n):
        # Collect some data
        data = connect.get_graph_data()
        print('Update graph, data units:', len(data['time']))

        # Create the graph with subplots
        fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
        fig['layout']['margin'] = {
            'l': 30, 'r': 10, 'b': 30, 't': 10
        }
        fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}

        fig.append_trace({
            'x': data['time'],
            'y': data['Altitude'],
            'name': 'Altitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 1, 1)
        fig.append_trace({
            'x': data['Longitude'],
            'y': data['Latitude'],
            'text': data['time'],
            'name': 'Longitude vs Latitude',
            'mode': 'lines+markers',
            'type': 'scatter'
        }, 2, 1)

        return fig

create_graphs('terra_topic', 'terra-text', 'terra-graph')
create_graphs('aqua_topic', 'aqua-text', 'aqua-graph')
create_graphs('aura_topic', 'aura-text', 'aura-graph')

if __name__ == '__main__':
    app.run_server(
        host='0.0.0.0',
        port=8050,
        debug=True)

К плюсам можно отнести (кроме тех, что указаны во второй части):



К минусам:



На рисунке ниже показана ситуация, при которой один backend микросервис перестал отправлять данные и так как всё приложение состоит из микросервисов, то оно продолжает работать (в отличие от монолитной конструкции), более того, когда этот микросервис возобновил работу, он подключился на лету, и не потребовалось всё перезапускать.



(Не смотря на то, что один микросервис завис, остальные продолжают работать)


Выводы


В данной статье я рассмотрел, на мой взгляд наиболее очевидные (далеко не все) достоинства и недостатки каждого подхода. Кроме того, хоть внедрение Kafka и кажется дополнительным усложнением проекта, но на самом деле задачи, которые необходимо решить обычно являются типовыми, такими как непрерывное чтение данных из бд или их запись, слив данных из нескольких бд в одну и т.д. Для подобных целей нет необходимости изобретать велосипед, можно использовать готовые решения из Kafka connectors, там есть поддержка для фактически всех хоть сколько-нибудь известных бд.


Дополнительные ссылки по теме:
Python + Kafka =? / Николай Сасковец/ bitnet [Python Meetup 14.09.2019]
Николай Сасковец, Построение микросервисных систем с использованием Kafka