背景
从集群A迁移索引以及模板到集群B 业务需要新建一套和现有集群一模一样的集群,并把索引mapping和template都复制一份到新的集群去 下面为我用python写的一份脚本
功能支持
- rollover创建的索引
- 非日期数字结尾的索引,该类索引需要单独处理
代码实现
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import re
import sys
from elasticsearch import Elasticsearch, helpers
# 索引迁移脚本
# ES 连接地址
from elasticsearch.exceptions import NotFoundError
sourceHost = [
{"host": "172.17.20.45", "port": 9201},
{"host": "172.17.20.46", "port": 9201},
{"host": "172.17.20.47", "port": 9201}
]
sourceUser = "atai"
sourcePassword = "123456"
# 日志ES集群 连接地址
destHost = [
{"host": "172.17.20.125", "port": 9200},
{"host": "172.17.20.126", "port": 9200},
{"host": "172.17.20.127", "port": 9200}
]
destUser = "elastic"
destPassword = "gr4cCpPaenaNyeM87E75"
# ES配置
sourceEs = Elasticsearch(
hosts=sourceHost,
http_auth=(sourceUser, sourcePassword),
timeout=3600
)
destEs = Elasticsearch(
destHost,
http_auth=(destUser, destPassword),
timeout=3600
)
migrateIndex = ["*"]
print("初始化完成,开始同步")
res = sourceEs.cat.indices(index=migrateIndex, params={"format": "json"})
# 正则表达式模式
default_pattern = r'^[^\.].*' # 排除点开头的系统索引
pattern = r'^[^\.].*-000001$' # 获取所有rollover的初始索引
not_pattern = r'^.*-\d+$' # 获取非rollover索引,获取后缀非数字的索引
print(res)
# 筛选符合条件的索引
filtered_indices = [
index['index'] for index in res
if re.match(default_pattern, index['index'])
]
# 使用rollover创建的索引
rollover_indices = [
index for index in filtered_indices
if re.match(pattern, index)
]
# 存在疑问的索引,这种疑问索引有可能是rollover的0000xx这样的索引,所以这个要放最后做处理
none_indices = [
index for index in filtered_indices
if not re.match(pattern, index) and re.match(not_pattern, index)
]
# 普通创建的索引
default_indices = [
index for index in filtered_indices
if not re.match(not_pattern, index)
]
print(f"存在疑问的索引,脚本开始判断是否是rollover的衍生 {none_indices}")
print(f"rollover操作的索引 {rollover_indices}")
print(f"普通创建的索引 {default_indices}")
for index_name in rollover_indices:
aliases = sourceEs.indices.get_alias(index=index_name)
filtered_aliases = [
alias_name for alias_name, alias_metadata in aliases.get(index_name).get('aliases').items()
if alias_metadata.get('is_write_index', False) is True
]
if not destEs.indices.exists(index=index_name):
length = len(filtered_aliases)
if length > 1:
print(f"索引 {index_name} 别名过滤is_write_index为true的别名数量大于1个,目前为{length}个,请排查原因,跳过迁移!!")
continue
if length == 1:
print(f"索引 {index_name} 使用rollover,开始rollover创建")
writeAlias = filtered_aliases[0]
indexTemplate = sourceEs.indices.get_index_template(name=writeAlias)
indexTemplate = indexTemplate.get('index_templates')[0].get('index_template')
response = destEs.indices.put_index_template(name=writeAlias, body=indexTemplate)
destEs.indices.create(index=index_name, aliases={writeAlias: {"is_write_index": True}})
else:
print(f"索引 {index_name} 已在目标集群存在,跳过操作")
for index_name in default_indices:
if not destEs.indices.exists(index=index_name):
mappings = sourceEs.indices.get_mapping(index=index_name)
setting = sourceEs.indices.get_settings(index=index_name)
settings = setting[index_name]['settings']
# 删除不必要的元数据
del settings["index"]["provided_name"]
del settings["index"]["creation_date"]
del settings["index"]["uuid"]
del settings["index"]["version"]
mappings = mappings[index_name]['mappings']
destEs.indices.create(index=index_name, mappings=mappings, settings=settings)
print(f"索引 {index_name} 创建成功")
else:
print(f"索引 {index_name} 已在目标集群存在,跳过操作")
warm_index = []
for index_name in none_indices:
print(f"处理疑问索引 {index_name} ")
if not destEs.indices.exists(index=index_name):
aliases = sourceEs.indices.get_alias(index=index_name)
filtered_aliases = [
alias_name for alias_name, alias_metadata in aliases.get(index_name).get('aliases').items()
if alias_metadata.get('is_write_index', False) is True
]
length = len(filtered_aliases)
if length == 0:
warm_index.append(index_name)
continue
writeAlias = filtered_aliases[0]
if destEs.indices.exists_alias(name=writeAlias):
print(f"别名 {writeAlias} 已存在,跳过")
continue
create_index_name = writeAlias + "-000001"
try:
indexTemplate = sourceEs.indices.get_index_template(name=writeAlias)
indexTemplate = indexTemplate.get('index_templates')[0].get('index_template')
response = destEs.indices.put_index_template(name=writeAlias, body=indexTemplate)
destEs.indices.create(index=create_index_name, aliases={writeAlias: {"is_write_index": True}})
except NotFoundError:
print(f"别名 {writeAlias} template不存在")
mappings = sourceEs.indices.get_mapping(index=index_name)
setting = sourceEs.indices.get_settings(index=index_name)
settings = setting[index_name]['settings']
# 删除不必要的元数据
del settings["index"]["provided_name"]
del settings["index"]["creation_date"]
del settings["index"]["uuid"]
del settings["index"]["version"]
mappings = mappings[index_name]['mappings']
destEs.indices.create(index=create_index_name, mappings=mappings, settings=settings
, aliases={writeAlias: {"is_write_index": True}})
print(f"索引 {index_name} 创建成功")
else:
print(f"索引 {index_name} 已在目标集群存在,跳过操作")
print(f"存在疑问的索引 {warm_index}")
print("索引迁移结束")
sourceEs.close()
destEs.close()
sys.exit(1)
运行到最后代码会输出没处理的索引,需要人工手动处理
评论区