i’m currently building small web app to manage Mikrotik router and i stucked in adding Bulk of users/vouchers to UserMan using Python code , i succefully add single user/voucher but in bulk the users/vouchers added without assigning profile. the code is attached
...
# (نفس الكود بدون تغيير حتى نصل إلى الأماكن المعدّلة)
@app.post("/api/add-user")
async def api_add_user(request: Request):
login = request.session.get("login")
if not login:
return JSONResponse({"message": "غير مصرح"}, status_code=401)
data = await request.json()
api, api_pool = connect_to_router(login["ip"], login["username"], login["password"], login["port"])
try:
cmd = f'/tool user-manager user add username="{data["username"]}" password="{data["password"]}" customer=admin shared-users=1'
out, err = run_ssh_command(login["ip"], login["username"], login["password"], 22, cmd)
if 'not found' in err.lower():
cmd = f'/user-manager user add username="{data["username"]}" password="{data["password"]}" customer=admin shared-users=1'
out, err = run_ssh_command(login["ip"], login["username"], login["password"], 22, cmd)
if err:
return JSONResponse({"message": f"خطأ في إضافة المستخدم: {err.strip()}"}, status_code=500)
if data.get("profile"):
time.sleep(1)
cmd_activate = f'/tool user-manager user create-and-activate-profile numbers={data["username"]} customer=admin profile={data["profile"]}'
out_activate, err_activate = run_ssh_command(login["ip"], login["username"], login["password"], 22, cmd_activate)
if err_activate:
return JSONResponse({"message": f"تم إضافة المستخدم لكن حدث خطأ في تفعيل البروفايل: {err_activate.strip()}"}, status_code=500)
return JSONResponse({"message": f"تمت إضافة المستخدم {data['username']} بنجاح"})
except Exception as e:
return JSONResponse({"message": str(e)}, status_code=500)
finally:
api_pool.disconnect()
@app.post("/api/add-vouchers-batch")
async def add_vouchers_batch(request: Request):
login = request.session.get("login")
if not login:
return JSONResponse({"message": "غير مصرح"}, status_code=401)
data = await request.json()
api, api_pool = connect_to_router(login["ip"], login["username"], login["password"], login["port"])
try:
vouchers = []
for _ in range(data["count"]):
max_attempts = 10
for attempt in range(max_attempts):
username = generate_username(data["prefix"], data["username_length"], data["username_type"])
password = generate_password(data["password_length"], data["password_type"])
try:
commands = [
f'/tool user-manager user add username="{username}" password="{password}" customer=admin shared-users=1'
]
if data.get("profile"):
commands.append(f'/tool user-manager user create-and-activate-profile numbers={username} customer=admin profile={data["profile"]}')
results = run_ssh_commands(login["ip"], login["username"], login["password"], 22, commands)
add_out, add_err = results[0][1], results[0][2]
if add_err:
if "already exists" in add_err.lower():
if attempt == max_attempts - 1:
vouchers.append({
"username": username,
"password": password,
"profile": data.get("profile", ""),
"status": "error",
"error": "تعذر توليد اسم مستخدم فريد بعد عدة محاولات"
})
continue
else:
vouchers.append({
"username": username,
"password": password,
"profile": data.get("profile", ""),
"status": "error",
"error": f"خطأ في إنشاء المستخدم: {add_err.strip()}"
})
break
if not data.get("profile"):
vouchers.append({
"username": username,
"password": password,
"profile": "",
"status": "success"
})
break
if len(results) > 1:
activate_out, activate_err = results[1][1], results[1][2]
if not activate_err and "error" not in activate_out.lower() and "no such item" not in activate_out.lower():
vouchers.append({
"username": username,
"password": password,
"profile": data["profile"],
"status": "success"
})
else:
vouchers.append({
"username": username,
"password": password,
"profile": data["profile"],
"status": "warning",
"error": f"تفعيل البروفايل فشل: {activate_err.strip()} {activate_out.strip()}"
})
break
except Exception as e:
vouchers.append({
"username": username,
"password": password,
"profile": data.get("profile", ""),
"status": "error",
"error": f"خطأ غير متوقع: {str(e)}"
})
break
successful = len([v for v in vouchers if v.get("status") == "success"])
warnings = len([v for v in vouchers if v.get("status") == "warning"])
errors = len([v for v in vouchers if v.get("status") == "error"])
message = f"تمت إضافة {successful} من {data['count']} Voucher بنجاح"
if warnings > 0:
message += f" ({warnings} تم إنشاؤهم مع تحذيرات)"
if errors > 0:
message += f" ({errors} فشل إنشاؤهم)"
return JSONResponse({
"message": message,
"vouchers": vouchers,
"stats": {
"total": data["count"],
"successful": successful,
"warnings": warnings,
"errors": errors
}
})
except Exception as e:
return JSONResponse({
"message": f"حدث خطأ: {str(e)}",
"vouchers": vouchers
}, status_code=500)
finally:
api_pool.disconnect()
@app.get('/api/test-um-ssh')
def test_um_ssh(request: Request):
login = request.session.get('login')
if not login:
return JSONResponse({'message': 'غير مصرح'}, status_code=401)
cmd1 = 'tool user-manager user add username=nnnb password=1234 customer=admin shared-users=1'
out1, err1 = run_ssh_command(login['ip'], login['username'], login['password'], 22, cmd1)
cmd2 = 'tool user-manager user create-and-activate-profile numbers=nnnb customer=admin profile=500YR-1.5G'
out2, err2 = run_ssh_command(login['ip'], login['username'], login['password'], 22, cmd2)
return JSONResponse({
'add_out': out1, 'add_err': err1,
'activate_out': out2, 'activate_err': err2
})
# ... باقي الكود كما هو