Files
config-ros/main.py
2025-08-18 15:48:15 +08:00

310 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from librouteros import connect,Api
import csv
import ssl
import threading
'''
自动配置 ros 脚本,需要安装 librouteros 库:
pip install librouteros
配置文件格式为 CSV包含以下字段
- index: 配置索引,【重要】这个 index 用来生成对应 l2tp-out 用户名中的序号,每个节点都是唯一且固定的
- name: 城市名
- code: 城市代码
- gateway: 网关地址
- public: 公网 IP
- mask: 公网子网掩码
- private: 内网 IP
如果需要添加或修改配置项,在编辑前记得先备份 config.csv 文件
如果需要修改配置内容,在 start 函数里 “配置 ros” 部分添加或修改函数调用
'''
threads = []
failed = []
def main():
# 加载配置文件
config_data = []
with open('config.csv', 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
for row in reader:
config_data.append(row)
# 多线程执行 ros 配置
for _, config in enumerate(config_data):
thread = threading.Thread(target=start, args=(config,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 输出结果
print('=' * 20)
print('配置完成统计({}/{}), 失败项:{}'.format(len(config_data)-len(failed), len(config_data), len(failed)))
for _, (item, err) in enumerate(failed):
print('{},{},{},{}'.format(item['public'], item['name'], item['code'], err))
def start(config):
print('{}: 配置{}({})'.format(str(config['index']).zfill(3), config['name'], config['code']))
# 连接到 ros
conn: Api
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.load_verify_locations('server.crt')
conn = connect(
username='admin',
password='wyongk9815',
host=config['public'],
port=8729,
ssl_wrapper=ctx.wrap_socket,
)
except Exception as err:
failed.append((config, err))
return
# 配置 ros
try:
configLogs(conn, config)
except Exception as err:
failed.append((config, err))
# 关闭连接
conn.close()
def configDefault(conn:Api,config):
configNet(conn, config)
configOuts(conn, config)
configScripts(conn, config)
configLogs(conn, config)
def configNet(conn:Api,config):
'''
配置网络
'''
# 配置路由
routes = conn.path('ip', 'route')
for route in routes:
if route['routing-table'] == '1':
try:
routes.update(**{
'.id': route['.id'],
'gateway': config['gateway']
})
except Exception as e:
print('更新默认路由失败: {}'.format(e))
continue
# 配置地址
addrs = conn.path('ip', 'address')
for addr in addrs:
if addr['interface'] == 'lan':
try:
addrs.update(**{
'.id': addr['.id'],
'address': config['private']
})
except Exception as e:
print('更新 WAN 地址失败: {}'.format(e))
continue
# 刷新 mac 地址
eths = conn.path('interface', 'ethernet')
for eth in eths:
if eth['name'] == 'lan':
tuple(eths('reset-mac-address', **{
'.id': eth['.id']
}))
def configOuts(conn:Api,config):
'''
配置 vpn 出口负载均衡
'''
count = 20
# 配置 ppp
ppps = conn.path('interface', 'l2tp-client')
# 删除旧的 ppp
for ppp in ppps:
if ppp['name'].startswith('l2tp-out'):
try:
ppps.remove(ppp['.id'])
except Exception as e:
print('删除 PPP 失败: {}'.format(e))
continue
# 添加新的 ppp
for i in range(1, count+1):
ppps.add(**{
'name':'l2tp-out{}'.format(i),
'connect-to':'192.168.25{}.25{}'.format((i-1)%3+1, (i-1)%3+1),
'user':'jdzz{}dt{}'.format(i, config['index']),
'password':'123231',
'disabled':'no',
})
# 配置路由
routes = conn.path('ip', 'route')
# 删除旧的路由表
for route in routes:
if str(route['routing-table']).startswith('r'):
try:
routes.remove(route['.id'])
except Exception as e:
print('删除路由表失败: {}'.format(e))
continue
# 添加新的路由表
for i in range(1,count+1):
routeName = 'r{}'.format(i)
routeOut = 'l2tp-out{}'.format(i)
try:
routes.add(**{
'dst-address': '0.0.0.0/0',
'gateway': routeOut,
'routing-table': routeName,
})
except Exception as e:
print('添加路由表失败: {}'.format(e))
continue
def configScripts(conn:Api,config):
'''
配置脚本
'''
scripts = conn.path('system', 'script')
for script in scripts:
if script['name'] == 'up':
with open('scripts/up.rsc', 'rb') as file:
upScript = str(file.read(), encoding='utf-8')
scripts.update(**{
'.id': script['.id'],
'source': upScript
})
elif script['name'] == 'down':
with open('scripts/down.rsc', 'rb') as file:
downScript = str(file.read(), encoding='utf-8')
scripts.update(**{
'.id': script['.id'],
'source': downScript
})
elif script['name'] == 'onlinestatus':
with open('scripts/onlinestatus.rsc', 'rb') as file:
onlineStatusScript = str(file.read(), encoding='utf-8')
scripts.update(**{
'.id': script['.id'],
'source': onlineStatusScript.replace('<IP>', config['public'])
})
elif script['name'] == 'pppoestatus':
with open('scripts/pppoestatus.rsc', 'rb') as file:
pppoeStatusScript = str(file.read(), encoding='utf-8')
scripts.update(**{
'.id': script['.id'],
'source': pppoeStatusScript.replace('<IP>', config['public'])
})
def configLogs(conn:Api,config):
'''
配置日志处理
'''
# 添加日志过滤器
filters = conn.path('ip', 'firewall', 'filter')
for filter in filters:
if filter['comment'] == 'natlog':
try:
filters.remove(filter['.id'])
except Exception as e:
print('删除过滤器失败: {}'.format(e))
continue
filters.add(**{
'chain': 'forward',
'action': 'log',
'comment': 'natlog',
'dst-address': '10.0.0.0/8',
'protocol': 'tcp',
'connection-nat-state': 'srcnat',
'tcp-flags': 'syn',
})
filters.add(**{
'chain': 'forward',
'action': 'log',
'comment': 'natlog',
'src-address': '10.0.0.0/8',
'protocol': 'udp',
'dst-port': '!53',
'connection-nat-state': '!srcnat',
})
# 添加日志动作
actions = conn.path('system', 'logging', 'action')
for action in actions:
if action['name'] in ['logremote', 'logremoteidc']:
try:
actions.remove(action['.id'])
except Exception as e:
print('删除日志动作失败: {}'.format(e))
continue
actions.add(**{
'name': 'logremote',
'target': 'remote',
'src-address': '0.0.0.0',
'remote': '106.119.167.38',
'remote-port': '5775',
})
actions.add(**{
'name': 'logremoteidc',
'target': 'remote',
'src-address': '0.0.0.0',
'remote': '192.168.100.255',
'remote-port': '5775',
})
# 配置日志动作
logs = conn.path('system', 'logging')
for log in logs:
if 'firewall' in log['topics'] and 'info' in log['topics']:
try:
logs.remove(log['.id'])
except Exception as e:
print('删除日志配置失败: {}'.format(e))
continue
logs.add(**{
'topics': 'firewall,info',
'prefix': config['code'],
'action': 'logremote',
})
logs.add(**{
'topics': 'firewall,info',
'prefix': config['code'],
'action': 'logremoteidc',
})
def configDebug(conn:Api,config):
'''
配置调试
'''
main()