Description
Sometimes you want or need to control who is accessing certain data on your server. Maybe everyone can see your homepage, but you have to be logged-in if you want to leave a comment. Or maybe you only want Admin users to be able to access certain features.
I’ll show you an example of how we can use Flask-Login to allow users to sign-up and login to our chat server. We’ll then use the name they entered when signing up instead of prompting them to enter a name in the chat window. We’ll also do some low-tech Role Access control in our example so that only Admin users can access the Reboot and Shutdown routes on our index page.
Our role-based security will be self-implemented, meaning that Flask-Login won’t help us in this effort. In addition, each user in our implementation can only have one role, as we will store the role as a column on the users table. I hope to show you Flask-User in a future post. It extends Flask-Login to give everything it provides, plus true role-based security. It also supports many roles for each user which can be very useful in certain situations.
We’re going to start with the Implementing Flask Web Sockets project which will give us a starting point to work with.
Parameters
This is some information about this project and the conditions under which it was done. If you try to replicate it in the future and it doesn’t work, you can evaluate these parameters to see if any changes between these and your configuration might have impacted your results. An example might be changes to future versions of Python or Flask.
- Date: 20 August, 2021
- Skill: Beginner+
- Raspberry Pi Model(s): Zero-W, 3B+, 4B
- OS: Raspberry Pi OS version 10 (Buster)
- Python Version: 3.7.3
- Flask Version: 2.0.1
Steps
- First we need to install the following libraries (the last two are new):
sudo pip3 install flask
sudo pip3 install flask-socketio
sudo pip3 install eventlet
sudo pip3 install flask-sqlalchemy
sudo pip3 install flask-login
- If you already have the code from the Implementing Web Sockets project, then you can open that version of sample.py and make the changes highlighted below. Otherwise, just copy the code below. The highlighted lines mostly represent the code needed to implement Flask-Login, but also a few lines were added to support adding our Chat users to a Room, and then disconnecting them if somehow the user chatting doesn’t match the current user for that session, just as a safety net.
Rooms can be used as a way to to direct messages to the correct groups of users (socket connections). In this case we only have one group, but I wanted to introduce the concept. You can imagine how you could use them in this example to support multiple chat rooms.
#!/usr/bin/env python3
#eventlet WSGI server
import eventlet
eventlet.monkey_patch()
#Flask Libs
from flask import Flask, render_template, request, redirect, url_for, flash
from werkzeug.security import generate_password_hash, check_password_hash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_required, current_user, login_user, logout_user
#Web Sockets
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect, rooms
#Datetime Lib
from datetime import datetime
#System Libs
import io
import os
import subprocess
#JSON Lib
import json
START_DATE = datetime.now().strftime('%m/%d/%y %I:%M:%S %p')
SERVER_NAME = 'Raspberry Pi Server'
PI_MODEL = 'Unknown'
CHAT_ROOM = 'chat-room'
proc = subprocess.Popen(["cat", "/sys/firmware/devicetree/base/model"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
if not err:
PI_MODEL = out.decode('utf-8')
else:
print("Error get Pi Model: %s" % err)
app = Flask(__name__)
socketio = SocketIO(app)
app.config['SECRET_KEY'] = 'abc123'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # Mute flask-sqlalchemy warning message
app.config['REMEMBER_COOKIE_SECURE'] = None # Change to True for production. Forces using HTTPS to store cookies.
app.config['REMEMBER_COOKIE_HTTPONLY'] = True # Prevents cookies from being accessed on the client-side.
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.login_view = 'login'
login_manager.init_app(app)
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True) # primary keys are required by SQLAlchemy
email = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
name = db.Column(db.String(1000))
role = db.Column(db.String(25))
db.create_all()
# Create first user
if not User.query.filter(User.email == 'jeff@example.com').first():
user = User(
email='jeff@example.com',
password=generate_password_hash('Password', method='sha256'),
name='Jeff',
role='Admin'
)
db.session.add(user)
db.session.commit()
@login_manager.user_loader
def load_user(user_id):
# since the user_id is just the primary key of our user table, use it in the query for the user
return User.query.get(int(user_id))
users = []
#Called when the browser sucessfully connects via a web socket
@socketio.on('connect')
def socket_connect():
data = json.dumps({'status': 'connected'})
emit('conn', data, broadcast=False) #Emitted back only to the new connection
#Called by the browser once the user has entered their username
@socketio.on('join')
def socket_join(msg):
print('User Role: %s' % current_user.role)
print('User: %s' % msg['username'])
if len(msg['username']) > 0:
users.append({'username': msg['username'], 'sid': request.sid}) #Keep track of the connected users
join_room(CHAT_ROOM)
#Emitted to all connections to announce the new user
emit('joined', json.dumps({'msg': '%s joined! (%d total users connected)' % (msg['username'], len(users))}), room=CHAT_ROOM)
user_list = ''
for user in users:
if len(user_list) > 0:
user_list += ', '
user_list += user['username']
#Emitted only to the new connection to announce who is already connected
emit('chat', json.dumps({'username': 'System', 'msg': 'Connected users: %s' % user_list}), broadcast=False)
#Called when the user types a message and presses Send
@socketio.on('chat_send')
def socket_chat(msg):
print('Chat: %s' % msg)
print('username (%s) / current_user (%s)' % (msg['username'], current_user.name))
if msg['username'] == current_user.name:
#Message is emitted to all connected users
emit('chat', json.dumps(msg), room=CHAT_ROOM)
else:
print('username (%s) is not the current_user (%s)' % (msg['username'], current_user.name))
leave_room(CHAT_ROOM)
disconnect()
#Called when a socket disconnects
@socketio.on('disconnect')
def socket_disconnect():
print('Disconnecting: %s' % request.sid)
for user in users:
if user['sid'] == request.sid:
print('Disconnecting: %s' % user['username'])
#Emmitted to all remained connected users to let them know a user has disconnected
emit('disconnected', json.dumps({'msg': '%s disconnected! (%d total users connected)' % (user['username'], len(users)-1)}), room=CHAT_ROOM)
users.remove(user) #Removed the disconnected user from the users list
@app.route('/')
def index():
serverData = {'serverName': SERVER_NAME, 'piModel': PI_MODEL}
return render_template('index.html', serverData=serverData)
@app.route('/login', methods=['GET'])
def login():
return render_template('login.html')
@app.route('/login', methods=['POST'])
def login_post():
email = request.form.get('email')
password = request.form.get('password')
remember = True if request.form.get('remember') else False
user = User.query.filter_by(email=email).first()
# check if the user actually exists
# take the user-supplied password, hash it, and compare it to the hashed password in the database
if not user or not check_password_hash(user.password, password):
flash('Please check your login details and try again.')
return redirect(url_for('login')) # if the user doesn't exist or password is wrong, reload the page
# if the above check passes, then we know the user has the right credentials
login_user(user, remember=remember)
return redirect(url_for('index'))
@app.route('/signup', methods=['GET'])
def signup():
return render_template('signup.html')
@app.route('/signup', methods=['POST'])
def signup_post():
email = request.form.get('email')
name = request.form.get('name')
password = request.form.get('password')
user = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database
if user: # if a user is found, we want to redirect back to signup page so user can try again
flash('Email address already exists')
return redirect(url_for('login'))
# create a new user with the form data. Hash the password so the plaintext version isn't saved.
new_user = User(email=email, name=name, password=generate_password_hash(password, method='sha256'))
# add the new user to the database
db.session.add(new_user)
db.session.commit()
return redirect(url_for('login'))
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/healthcheck')
def healthcheck():
return render_template('health.html')
@app.route('/health-data')
def healthData():
curDateTime = datetime.now().strftime('%m/%d/%y %-I:%M:%S %p')
resp = {'serverName': SERVER_NAME, 'piModel': PI_MODEL, 'startDate': START_DATE, 'curDate': curDateTime}
#Get ip.txt contents
try:
f = open('/home/pi/ip.txt', 'r')
fdata = f.readlines()
fcnt = 'Fail Cnt: %s' % (fdata[0].strip())
rcnt = 'Reboot Cnt: %s' % (fdata[1].strip())
f.close()
except:
fcnt = 'Fail Cnt: No Data'
rcnt = 'Reboot Cnt: No Data'
resp['fcnt'] = fcnt
resp['rcnt'] = rcnt
#Get reboot.txt contents
try:
f = open('/home/pi/reboot.txt', 'r')
fdata = f.readlines()
rhist = ''
for x in range(len(fdata)):
rhist += fdata[x]
f.close()
except:
rhist = 'None'
resp['rhist'] = rhist
# Report available disk space
stat = os.statvfs('/home/pi')
gbFree = '%0.2f GB' % (stat.f_bfree*stat.f_bsize/1024/1024/1024)
resp['gbFree'] = gbFree
# Report CPU Temp
try:
tFile = open('/sys/class/thermal/thermal_zone0/temp')
temp = float(tFile.read())
tempC = '%0.1f C' % (temp/1000)
tempF = '%0.1f F' % ((temp/1000) * 1.8 + 32)
except:
tempC = 'ERR'
tempF = 'ERR'
resp['tempC'] = tempC
resp['tempF'] = tempF
tFile.close()
return json.dumps(resp)
@app.route('/chat')
@login_required
def chat():
return render_template('chat.html')
@app.route('/reboot')
@login_required
def reboot():
if current_user.role == 'Admin':
ret = os.system('sudo reboot')
return 'Rebooting'
flash("You must be an Admin to Reboot!")
return redirect(url_for('login'))
@app.route('/shutdown')
@login_required
def shutdown():
if current_user.role == 'Admin':
ret = os.system('sudo shutdown -h now')
return 'Shutting down'
flash("You must be an Admin to Shutdown!")
return redirect(url_for('login'))
if __name__ == '__main__':
#app.run(debug=True, host='0.0.0.0', port=5000)
socketio.run(app, debug=True, host='0.0.0.0', port=5000)
You can see that we added the @login_required decorator to the routes that we only want to exposed to logged-in users. And for the Reboot and Shutdown routes, we also checked to make sure their role was Admin before proceeding. If they’re not, we flash a message and redirect to the Login route. Flash is used to pass messages to the jinja template engine so they can be displayed to the user in HTML. You’ll see how they get pulled in when we look at login.html and signup.html.
- Next we’ll take a look at index.html. Here’s our first glimpse at some additional jinja features. We already know how to pass parameters using the double sets of curly braces ({{ variable }}), but now we see how we can implement some control logic using the {% statement %} operators. We start with {% if current_user.is_authenticated %} to see if the current user has been authenticated and, if so, we add the user name/role, the link to the Chat route, and a Logout link. If you’re not logged-in you won’t see those listed, but instead you will see the {% else %} conditional elements (Login and Signup). Also note that we are checking to make sure the current user is an Admin before showing the Reboot and Shutdown links ({% if current_user.role == “Admin” %}). We also included @login_required decorators in the server code for these routes so that users can’t access them directly through the browser.
<!DOCTYPE html>
<html>
<head>
<style>
body {
background-color: lightblue;
color: blue;
font-size: 24px;
}
</style>
</head>
<body>
<h1>{{serverData['serverName']}}</h1>
<h2> [{{serverData['piModel']}}]</h2>
{% if current_user.is_authenticated %}
<h3>Logged-In User: {{ current_user.name }} [{{ current_user.role|default("User", True) }}]</h3>
<p><a href="/chat">Chat</a></p>
<p><a href="{{ url_for('logout') }}">Logout</a></p>
{% else %}
<p><a href="{{ url_for('login') }}">Login</a></p>
<p><a href="{{ url_for('signup') }}">Signup</a></p>
{% endif %}
<p><a href="/healthcheck">Health Check</a></p>
{% if current_user.role == "Admin" %}
<p><a href="/reboot">Reboot</a></p>
<p><a href="/shutdown">Shutdown</a></p>
{% endif %}
</body>
</html>
- Now we’ll look at the change in chat.html. It’s not even a required change, but you’ll notice is that we changed the variable username to a constant and we are setting it to the value of current_user.name. This is to help prevent a malicious user from changing the value of username. But if they do somehow, we double-check it back in the server code to make sure it matches the current user and will disconnect their socket if it’s different.
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="apple-mobile-web-app-capable" content="yes">
<title>Simple Chat</title>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/4.1.3/socket.io.min.js"></script>
<style>
body {
background-color: lightblue;
color: blue;
font-size: 24px;
}
#chat {
background-color: white;
border: solid 1px black;
height: 500px;
width: 500px;
font-size: 18px;
overflow: scroll;
}
#chat-text {
margin-top: 20px;
}
#text-area {
width: 410px;
height: 30px;
}
#send {
height: 35px;
width: 90px;
font-size: 20px;
}
</style>
</head>
<body>
<h3 id="chat-title">Chat Window</h3>
<div id="chat"></div>
<div id="chat-text">
<label for="text-area">Send your message!</label><br />
<input type="text" id="text-area" autofocus placeholder="Enter message here...">
<button type="button" id="send" onclick="sendClick()">Send</button>
</div>
<p><a href="/">Index</a></p>
<script>
var chatTitle = document.getElementById("chat-title");
var msgBox = document.getElementById("text-area");
var chatWindow = document.getElementById("chat");
var sendButton = document.getElementById("send");
//Listen for the user pressing Enter while in the message box
msgBox.addEventListener("keyup", function(event) {
if (event.keyCode === 13) {
event.preventDefault();
sendButton.click();
}});
const DOW_SHORT = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
function getTimeDisplay(dt) {
var time_str = "";
var hrs = "";
var mins = "";
var ap = "AM";
var val = 0;
val = dt.getHours();
if (val >=12) {
ap = "PM";
if (val > 12) {
val -= 12;
}
}
else if (val == 0) {
val = 12;
}
hrs = val.toString();
val = dt.getMinutes();
mins = ("0" + val).slice(-2);
time_str = hrs + ":" + mins + " " + ap;
return time_str;
}
var protocol = window.location.protocol;
var socketURL = protocol + '//' + document.domain + ':' + location.port;
console.log("Socket URL: " + socketURL);
//Request a web socket connection with the server
var socket = io.connect(socketURL, {
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 10000,
reconnectionAttempts: 99999
});
const username = "{{ current_user.name }}";
//Called by the server once it accepts the connection
socket.on('conn', function(msg) {
chatTitle.style.color = "blue"; //Set title to Blue to indicate a good connection
var dt = new Date();
var dtDisp = DOW_SHORT[dt.getDay()] + " " + getTimeDisplay(dt);
var data = JSON.parse(msg);
console.log(dtDisp + " - " + data.status);
//Only prompt for the username if we don't have it already.
//If we lose the connection and get it back, we don't need to ask again.
if (!username) {
username = prompt("Please enter your username");
if (username != null) {
console.log("Hello " + username + "!");
}
}
if (username) {
//Tell the server our username
socket.emit('join', {'username': username});
//Update our title with our username
chatTitle.textContent = "Chat Window [" + username + "]";
}
});
//Called when our socket is disconnected
socket.on('disconnect', function(msg) {
chatTitle.style.color = "red"; //Set the title to Red to indicate our connection has been lost
console.log("disconnect: " + msg);
});
//Call by the server whenever a new user joins
socket.on('joined', function(msg) {
console.log("joined: " + msg);
var dt = new Date();
var dtDisp = DOW_SHORT[dt.getDay()] + " " + getTimeDisplay(dt);
var data = JSON.parse(msg);
var dataDisp = dtDisp + ": " + data.msg;
console.log(dataDisp);
//Show join messages with a yellow background
chatWindow.innerHTML += "<span style='background-color:yellow'>" + dataDisp + "</span><br />";
});
//Called by the server to announce that a user has disconnected
socket.on('disconnected', function(msg) {
console.log("disconnected: " + msg);
var dt = new Date();
var dtDisp = DOW_SHORT[dt.getDay()] + " " + getTimeDisplay(dt);
var data = JSON.parse(msg);
var dataDisp = dtDisp + ": " + data.msg;
console.log(dataDisp);
//Show disconnection messages with a red background
chatWindow.innerHTML += "<span style='background-color:red'>" + dataDisp + "</span><br />";
});
//Called by the server to broadcast a message that a user has sent
socket.on('chat', function(msg) {
console.log("chat: " + msg);
var dt = new Date();
var dtDisp = DOW_SHORT[dt.getDay()] + " " + getTimeDisplay(dt);
var data = JSON.parse(msg);
//If this is our message, display the background as light gray
var style = username == data.username ? "background-color: lightgray" : "";
var dataDisp = `<span style='${style}'>${dtDisp}: [${data.username}] ${data.msg}</span>`;
console.log(dataDisp);
chatWindow.innerHTML += dataDisp + "<br />";
chatWindow.scrollBy(0,30); //Scroll the window down to keep last items in view
});
//Called when the Send button is pressed or the user presses Enter in the message box
function sendClick() {
if (msgBox.value.length > 0) {
socket.emit('chat_send', {'username': username, 'msg': msgBox.value}); //Emit the message
}
msgBox.value = ""; //Clear the current text
msgBox.focus(); //Set focus back to the message box
}
</script>
</body>
</html>
Probably the most likely scenario for the current_user.name and the username to get out of sync is from having multiple browsers open on your device and logging in and out and possibly even cycling the server in the middle. You’ll notice that if you open one browser and log-in, then open a second browser, the first user will still be logged-in. That is because Flask-Login uses a session cookie that is specific for the address used (i.e. http://192.168.2.221:5000/), and the cookie doesn’t expire until either the expiration date/time is reached or the user logs out. If you just close your browser and then open a new one and go to your address, you will still be logged in. You’ll also notice a Remember Me check-box on the login form. If you check that, your session cookie will be valid for 365 days, or until you log out or a different user logs in on that browser, which will overwrite the session cookie.
Back to the sync problem… You now have two browsers open on your device and both are logged-in for the same user, then you log one out and log-in as a different user. Now if you click on the Chat link for both you will notice that they both say the last user to be logged in. The same if you then click on the Index link, they both say the same user. That’s because the session cookie was updated to be the last user that logged in.
But if you leave one in the Chat window (that has an active socket connection), and return to the index page on the other and then log-out and back in as another user and go to Chat, you will see that both continue to work as the separate users. That’s because the server knows that they have both been authenticated and it is maintaining the proper session context for each without the use of the session cookie (it’s not passed in sockets). But as soon as you go back to the index page on both browsers, the session cookie comes back into play and they will both become the same, last logged-in user again.
This is mainly a problem only with testing, but I wanted to make you aware of what was happening in case you see it.
- Now we’re going to create base.html. It looks just like any other HTML document until you get down to the body and see the {% block content %} {% endblock %} lines. This is another jinja feature that allows you to create a base template that can be extended by other templates. In our case both login.html and signup.html will extend this template. This is commonly used to give a consistent look and feel between pages, such as with menus, headers, footers, etc. In this example we are pulling in a third-party stylesheet (Bulma) to help us with our form styling. Save this file in your templates folder.
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Flask Auth Example</title>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/bulma/0.7.2/css/bulma.min.css" />
<style>
html {
background-color: lightblue;
}
</style>
</head>
<body>
<div class="container has-text-centered">
{% block content %}
{% endblock %}
</div>
</body>
</html>
- Let’s cover login.html now. Also save this file in your templates folder. As you can see, the first line is {% extends “base.html” %} which lets jinja know which template to pull in. Then we have the code between the {% block content %} {% endblock %}, which is basically pasting that code into the matching block in base.html.
I mentioned previously while reviewing the server code that we can use flash to send messages to jinja for use in our HTML. You see how we access those messages with the {% with messages = get_flashed_messages() %} statement. Below we are only pulling the first message if there are any, but if we wanted or needed to we could loop through all messages and show them to the user. The rest is just a basic HTML form which will call our Login route with a POST method when submitted.
{% extends "base.html" %}
{% block content %}
<div class="column is-4 is-offset-4">
<h3 class="title">Login</h3>
<div class="box">
{% with messages = get_flashed_messages() %}
{% if messages %}
<div class="notification is-danger">
{{ messages[0] }}
</div>
{% endif %}
{% endwith %}
<form method="POST" action="/login">
<form method="POST" action="/login">
<div class="field">
<div class="control">
<input class="input is-large" type="email" name="email" placeholder="Your Email" autofocus="">
</div>
</div>
<div class="field">
<div class="control">
<input class="input is-large" type="password" name="password" placeholder="Your Password">
</div>
</div>
<div class="field">
<label class="checkbox">
<input type="checkbox" name="remember">
Remember me
</label>
</div>
<button class="button is-block is-info is-large is-fullwidth">Login</button>
<button class="button is-block is-light is-large is-fullwidth"><a href="/">Cancel</a></button>
</form>
</div>
</div>
{% endblock %}
- Our signup.html is basically the same as login.html, but it calls the signup route on submit. Save this file in your templates folder.
{% extends "base.html" %}
{% block content %}
<div class="column is-4 is-offset-4">
<h3 class="title">Sign Up</h3>
<div class="box">
{% with messages = get_flashed_messages() %}
{% if messages %}
<div class="notification is-danger">
{{ messages[0] }}. Go to <a href="{{ url_for('login') }}">login page</a>.
</div>
{% endif %}
{% endwith %}
<form method="POST" action="/signup">
<form method="POST" action="/signup">
<div class="field">
<div class="control">
<input class="input is-large" type="email" name="email" placeholder="Email" value="{{ email }}" autofocus="">
</div>
</div>
<div class="field">
<div class="control">
<input class="input is-large" type="text" name="name" placeholder="Name" autofocus="">
</div>
</div>
<div class="field">
<div class="control">
<input class="input is-large" type="password" name="password" placeholder="Password">
</div>
</div>
<button class="button is-block is-info is-large is-fullwidth">Sign Up</button>
<button class="button is-block is-light is-large is-fullwidth"><a href="/">Cancel</a></button>
</form>
</div>
</div>
{% endblock %}
- Now start your server and test from the browser. If you didn’t change the server code I provided, you have a default user with an email of jeff@example.com and a password of Password. This user will be an Admin. Note that there’s no way to create an Admin user through the signup process. You would need to make that change by updating the users table directly.
Summary
Hopefully you were able to successfully complete this project and see how you can leverage Flask-Login to add authentication to your Flask server. For simple use cases where you don’t need role-access or only need users to belong to a single role, you can implement your own role-access controls.
I hope you found this project useful and can leverage the concepts in future projects!