侧边栏壁纸
  • 累计撰写 48 篇文章
  • 累计创建 33 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

ES集群索引模板迁移脚本-从集群A迁移索引以及模板到集群B

Angus
2023-08-17 / 0 评论 / 0 点赞 / 89 阅读 / 6510 字

背景

从集群A迁移索引以及模板到集群B 业务需要新建一套和现有集群一模一样的集群,并把索引mapping和template都复制一份到新的集群去 下面为我用python写的一份脚本

功能支持

  1. rollover创建的索引
  2. 非日期数字结尾的索引,该类索引需要单独处理

代码实现

#!/usr/bin/python3
# -*- coding:utf-8 -*-
import re
import sys

from elasticsearch import Elasticsearch, helpers

# 索引迁移脚本
# ES 连接地址
from elasticsearch.exceptions import NotFoundError

sourceHost = [
    {"host": "172.17.20.45", "port": 9201},
    {"host": "172.17.20.46", "port": 9201},
    {"host": "172.17.20.47", "port": 9201}
]
sourceUser = "atai"
sourcePassword = "123456"
# 日志ES集群 连接地址
destHost = [
    {"host": "172.17.20.125", "port": 9200},
    {"host": "172.17.20.126", "port": 9200},
    {"host": "172.17.20.127", "port": 9200}
]
destUser = "elastic"
destPassword = "gr4cCpPaenaNyeM87E75"

# ES配置
sourceEs = Elasticsearch(
    hosts=sourceHost,
    http_auth=(sourceUser, sourcePassword),
    timeout=3600
)

destEs = Elasticsearch(
    destHost,
    http_auth=(destUser, destPassword),
    timeout=3600
)
migrateIndex = ["*"]
print("初始化完成,开始同步")
res = sourceEs.cat.indices(index=migrateIndex, params={"format": "json"})
# 正则表达式模式
default_pattern = r'^[^\.].*'  # 排除点开头的系统索引
pattern = r'^[^\.].*-000001$'  # 获取所有rollover的初始索引
not_pattern = r'^.*-\d+$'  # 获取非rollover索引,获取后缀非数字的索引
print(res)
# 筛选符合条件的索引
filtered_indices = [
    index['index'] for index in res
    if re.match(default_pattern, index['index'])
]
# 使用rollover创建的索引
rollover_indices = [
    index for index in filtered_indices
    if re.match(pattern, index)
]
# 存在疑问的索引,这种疑问索引有可能是rollover的0000xx这样的索引,所以这个要放最后做处理
none_indices = [
    index for index in filtered_indices
    if not re.match(pattern, index) and re.match(not_pattern, index)
]
# 普通创建的索引
default_indices = [
    index for index in filtered_indices
    if not re.match(not_pattern, index)
]
print(f"存在疑问的索引,脚本开始判断是否是rollover的衍生 {none_indices}")
print(f"rollover操作的索引 {rollover_indices}")
print(f"普通创建的索引 {default_indices}")
for index_name in rollover_indices:
    aliases = sourceEs.indices.get_alias(index=index_name)
    filtered_aliases = [
        alias_name for alias_name, alias_metadata in aliases.get(index_name).get('aliases').items()
        if alias_metadata.get('is_write_index', False) is True
    ]
    if not destEs.indices.exists(index=index_name):
        length = len(filtered_aliases)
        if length > 1:
            print(f"索引 {index_name} 别名过滤is_write_index为true的别名数量大于1个,目前为{length}个,请排查原因,跳过迁移!!")
            continue
        if length == 1:
            print(f"索引 {index_name} 使用rollover,开始rollover创建")
            writeAlias = filtered_aliases[0]
            indexTemplate = sourceEs.indices.get_index_template(name=writeAlias)
            indexTemplate = indexTemplate.get('index_templates')[0].get('index_template')
            response = destEs.indices.put_index_template(name=writeAlias, body=indexTemplate)
            destEs.indices.create(index=index_name, aliases={writeAlias: {"is_write_index": True}})
    else:
        print(f"索引 {index_name} 已在目标集群存在,跳过操作")
for index_name in default_indices:
    if not destEs.indices.exists(index=index_name):
        mappings = sourceEs.indices.get_mapping(index=index_name)
        setting = sourceEs.indices.get_settings(index=index_name)
        settings = setting[index_name]['settings']
        # 删除不必要的元数据
        del settings["index"]["provided_name"]
        del settings["index"]["creation_date"]
        del settings["index"]["uuid"]
        del settings["index"]["version"]
        mappings = mappings[index_name]['mappings']
        destEs.indices.create(index=index_name, mappings=mappings, settings=settings)
        print(f"索引 {index_name} 创建成功")
    else:
        print(f"索引 {index_name} 已在目标集群存在,跳过操作")
warm_index = []
for index_name in none_indices:
    print(f"处理疑问索引 {index_name} ")
    if not destEs.indices.exists(index=index_name):
        aliases = sourceEs.indices.get_alias(index=index_name)
        filtered_aliases = [
            alias_name for alias_name, alias_metadata in aliases.get(index_name).get('aliases').items()
            if alias_metadata.get('is_write_index', False) is True
        ]
        length = len(filtered_aliases)
        if length == 0:
            warm_index.append(index_name)
            continue
        writeAlias = filtered_aliases[0]
        if destEs.indices.exists_alias(name=writeAlias):
            print(f"别名 {writeAlias} 已存在,跳过")
            continue
        create_index_name = writeAlias + "-000001"
        try:
            indexTemplate = sourceEs.indices.get_index_template(name=writeAlias)
            indexTemplate = indexTemplate.get('index_templates')[0].get('index_template')
            response = destEs.indices.put_index_template(name=writeAlias, body=indexTemplate)
            destEs.indices.create(index=create_index_name, aliases={writeAlias: {"is_write_index": True}})
        except NotFoundError:
            print(f"别名 {writeAlias} template不存在")
            mappings = sourceEs.indices.get_mapping(index=index_name)
            setting = sourceEs.indices.get_settings(index=index_name)
            settings = setting[index_name]['settings']
            # 删除不必要的元数据
            del settings["index"]["provided_name"]
            del settings["index"]["creation_date"]
            del settings["index"]["uuid"]
            del settings["index"]["version"]
            mappings = mappings[index_name]['mappings']
            destEs.indices.create(index=create_index_name, mappings=mappings, settings=settings
                                  , aliases={writeAlias: {"is_write_index": True}})
        print(f"索引 {index_name} 创建成功")
    else:
        print(f"索引 {index_name} 已在目标集群存在,跳过操作")
print(f"存在疑问的索引 {warm_index}")
print("索引迁移结束")
sourceEs.close()
destEs.close()
sys.exit(1)

运行到最后代码会输出没处理的索引,需要人工手动处理

0

评论区