analitics

Pages

Showing posts with label sqlite3. Show all posts
Showing posts with label sqlite3. Show all posts

Monday, August 19, 2024

Python 3.12.1 : Web server with SQLite database using flask - update.

Update with new URL with params, see the first tutorial:
from flask import Flask, request, jsonify, render_template_string
import sqlite3
from datetime import datetime

app = Flask(__name__)

# Clasa pentru serverul SQL
class SQLiteServer:
    def __init__(self, db_name):
        self.db_name = db_name
        self.init_db()

    def init_db(self):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                first_name TEXT,
                last_name TEXT,
                occupation TEXT,
                hobby TEXT,
                year_of_birth INTEGER,
                age INTEGER
            )
        ''')
        conn.commit()
        conn.close()

    def calculate_age(self, year_of_birth):
        # Adjust year_of_birth if only two digits are provided
        if len(str(year_of_birth)) == 2:
            if year_of_birth > int(str(datetime.now().year)[-2:]):
                year_of_birth += 1900
            else:
                year_of_birth += 2000
        current_year = datetime.now().year
        return current_year - year_of_birth

    def add_user(self, first_name, last_name, occupation, hobby, year_of_birth):
        age = self.calculate_age(year_of_birth)
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute('''
            INSERT INTO users (first_name, last_name, occupation, hobby, year_of_birth, age)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (first_name, last_name, occupation, hobby, year_of_birth, age))
        conn.commit()
        conn.close()

    def get_users(self):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute("SELECT * FROM users")
        users = c.fetchall()
        conn.close()
        return users
    def get_users_jsonify():
        conn = sqlite3.connect('sqlite_database.db')
        c = conn.cursor()
        c.execute("SELECT * FROM users")
        users = c.fetchall()
        conn.close()
        return jsonify(users)

# Clasa pentru serverul web
class WebServer:
    def __init__(self, sqlite_server):
        self.sqlite_server = sqlite_server

    def run(self):
        app.run(debug=True)
    #   adauga user si buton de redirect la pagina users
    @app.route('/')
    def index():
        users = sqlite_server.get_users()
        return render_template_string('''
            <!DOCTYPE html>
            <html lang="en">
            <head>
                <meta charset="UTF-8">
                <meta name="viewport" content="width=device-width, initial-scale=1.0">
                <title>Flask website for testing cypress with sqlite</title>
            </head>
            <body>
                <h2>Add User</h2>
                <form action="/add_user" method="post">
                    First Name: <input type="text" name="first_name"><br>
                    Last Name: <input type="text" name="last_name"><br>
                    Occupation: <input type="text" name="occupation"><br>
                    Hobby: <input type="text" name="hobby"><br>
                    Year of Birth: <input type="text" name="year_of_birth"><br>
                    <input type="submit" value="Add User">
                </form>
                <a href="http://127.0.0.1:5000/users"><button type="button">Show Users</button></a>
            </body>
            </html>
        ''', users=users)
    @app.route('/add_user', methods=['POST'])
    def add_user():
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        occupation = request.form['occupation']
        hobby = request.form['hobby']
        year_of_birth = int(request.form['year_of_birth'])
        sqlite_server.add_user(first_name, last_name, occupation, hobby, year_of_birth)
        return 'User added successfully! <a href="/">Go back</a>'
    @app.route('/users', methods=['GET'])
    def get_users():
        query_type = request.args.get('query_type', 'simple')
        
        conn = sqlite3.connect('sqlite_database.db')
        c = conn.cursor()
        
        try:
            c.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="users"')
            if not c.fetchone():
                return jsonify({"error": "Table 'users' does not exist"})

            if query_type == 'advanced':
                # Advanced query logic
                first_name = request.args.get('first_name')
                last_name = request.args.get('last_name')
                occupation = request.args.get('occupation')
                hobby = request.args.get('hobby')
                year_of_birth = request.args.get('year_of_birth')

                query = 'SELECT * FROM users WHERE 1=1'
                params = []
                # Exemple query simple 
                # Basic query: /users
                # Simple query: /users?query_type=simple for simple selection
                # Addvanced query: /users?query_type=advanced&first_name=John&occupation=Engineer for advanced querying
                # Advanced query with name search: /users?query_type=advanced&first_name=John&last_name=Doe
                # Query by occupation: /users?query_type=advanced&occupation=Engineer
                # Query by hobby: /users?query_type=advanced&hobby=Reading
                # Query by year of birth: /users?query_type=advanced&year_of_birth=1990

                if first_name:
                    query += ' AND first_name LIKE ?'
                    params.append(f'%{first_name}%')
                if last_name:
                    query += ' AND last_name LIKE ?'
                    params.append(f'%{last_name}%')
                if occupation:
                    query += ' AND occupation LIKE ?'
                    params.append(f'%{occupation}%')
                if hobby:
                    query += ' AND hobby LIKE ?'
                    params.append(f'%{hobby}%')
                if year_of_birth:
                    query += ' AND year_of_birth = ?'
                    params.append(year_of_birth)

                # Query by minimum age: /users?query_type=advanced&min_age=30
                # Query by maximum age: /users?query_type=advanced&max_age=50
                # Query with ordering: /users?query_type=advanced&order_by=last_name
                # Query with limit: /users?query_type=advanced&limit=10
                # Combined query: /users?query_type=advanced&first_name=John&occupation=Engineer&min_age=25&order_by=year_of_birth&limit=5          
                # Additional advanced query options
                for param, value in request.args.items():
                    match param:
                        case 'min_age':
                            query += ' AND (? - year_of_birth) >= ?'
                            params.extend([datetime.now().year, int(value)])
                        case 'max_age':
                            query += ' AND (? - year_of_birth) <= ?'
                            params.extend([datetime.now().year, int(value)])
                        case 'order_by':
                            query += f' ORDER BY {value}'
                        case 'limit':
                            query += ' LIMIT ?'
                            params.append(int(value))
                c.execute(query, params)
            else:
                # Simple query logic
                c.execute('SELECT * FROM users')

            users = c.fetchall()
        except sqlite3.OperationalError as e:
            return jsonify({"error": str(e)})
        finally:
            conn.close()
        
        return jsonify(users)

# Instanțierea serverului SQL și a serverului web
sqlite_server = SQLiteServer('sqlite_database.db')
web_server = WebServer(sqlite_server)

if __name__ == '__main__':
    web_server.run()

Wednesday, August 14, 2024

Python 3.12.1 : Web server with SQLite database using flask.

This python surce script can be used to start a web server with an SQLite server.
For example, you can use this to test with javascript on sql server, see next image:
This is the source code:
from flask import Flask, request, jsonify, render_template_string
import sqlite3
from datetime import datetime

app = Flask(__name__)

# Clasa pentru serverul SQL
class SQLServer:
    def __init__(self, db_name):
        self.db_name = db_name
        self.init_db()

    def init_db(self):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                first_name TEXT,
                last_name TEXT,
                occupation TEXT,
                hobby TEXT,
                year_of_birth INTEGER,
                age INTEGER
            )
        ''')
        conn.commit()
        conn.close()

    def calculate_age(self, year_of_birth):
        # Adjust year_of_birth if only two digits are provided
        if len(str(year_of_birth)) == 2:
            if year_of_birth > int(str(datetime.now().year)[-2:]):
                year_of_birth += 1900
            else:
                year_of_birth += 2000
        current_year = datetime.now().year
        return current_year - year_of_birth

    def add_user(self, first_name, last_name, occupation, hobby, year_of_birth):
        age = self.calculate_age(year_of_birth)
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute('''
            INSERT INTO users (first_name, last_name, occupation, hobby, year_of_birth, age)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (first_name, last_name, occupation, hobby, year_of_birth, age))
        conn.commit()
        conn.close()

    def get_users(self):
        conn = sqlite3.connect(self.db_name)
        c = conn.cursor()
        c.execute("SELECT * FROM users")
        users = c.fetchall()
        conn.close()
        return users

# Clasa pentru serverul web
class WebServer:
    def __init__(self, sql_server):
        self.sql_server = sql_server

    def run(self):
        app.run(debug=True)

    @app.route('/')
    def index():
        users = sql_server.get_users()
        return render_template_string('''
            <h1>Users</h1>
            <ul>
                {% for user in users %}
                    <li>{{ user[1] }} {{ user[2] }} - {{ user[3] }} - {{ user[4] }} - {{ user[5] }} ({{ user[6] }} years old)</li>
                {% endfor %}
            </ul>
            <h2>Add User</h2>
            <form action="/add_user" method="post">
                First Name: <input type="text" name="first_name"><br>
                Last Name: <input type="text" name="last_name"><br>
                Occupation: <input type="text" name="occupation"><br>
                Hobby: <input type="text" name="hobby"><br>
                Year of Birth: <input type="text" name="year_of_birth"><br>
                <input type="submit" value="Add User">
            </form>
        ''', users=users)

    @app.route('/add_user', methods=['POST'])
    def add_user():
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        occupation = request.form['occupation']
        hobby = request.form['hobby']
        year_of_birth = int(request.form['year_of_birth'])
        sql_server.add_user(first_name, last_name, occupation, hobby, year_of_birth)
        return 'User added successfully! <a href="/">Go back</a>'

# Instanțierea serverului SQL și a serverului web
sql_server = SQLServer('example.db')
web_server = WebServer(sql_server)

if __name__ == '__main__':
    web_server.run()

Sunday, March 24, 2024

Python 3.12.1 : PrettyTable

A simple Python library for easily displaying tabular data in a visually appealing ASCII table format, see this webpage.
Let's install with pip tool:
pip install prettytable
Collecting prettytable
...
Successfully installed prettytable-3.10.0 wcwidth-0.2.13
Let's test with the default example:
from prettytable import PrettyTable
table = PrettyTable()
table.field_names = ["City name", "Area", "Population", "Annual Rainfall"]
table.add_row(["Adelaide", 1295, 1158259, 600.5])
table.add_row(["Brisbane", 5905, 1857594, 1146.4])
table.add_row(["Darwin", 112, 120900, 1714.7])
table.add_row(["Hobart", 1357, 205556, 619.5])
table.add_row(["Sydney", 2058, 4336374, 1214.8])
table.add_row(["Melbourne", 1566, 3806092, 646.9])
table.add_row(["Perth", 5386, 1554769, 869.4])
print(table)
The result is this:
python test_001.py
+-----------+------+------------+-----------------+
| City name | Area | Population | Annual Rainfall |
+-----------+------+------------+-----------------+
|  Adelaide | 1295 |  1158259   |      600.5      |
|  Brisbane | 5905 |  1857594   |      1146.4     |
|   Darwin  | 112  |   120900   |      1714.7     |
|   Hobart  | 1357 |   205556   |      619.5      |
|   Sydney  | 2058 |  4336374   |      1214.8     |
| Melbourne | 1566 |  3806092   |      646.9      |
|   Perth   | 5386 |  1554769   |      869.4      |
+-----------+------+------------+-----------------+
Let's test with sqlite3 feature:
import sqlite3
from prettytable import from_db_cursor

try:
    # Connect to the database
    connection = sqlite3.connect("file_paths.db")
    cursor = connection.cursor()

    # Get the list of table names
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    table_names = cursor.fetchall()
    # Print the table names
    for table in table_names:
        print(table[0])
        query = str('SELECT * FROM ' + table[0])
        cursor.execute(query)
        table_names = cursor.fetchall()
        print(table_names)
except sqlite3.Error as e:
    print(f"Error: {e}")

finally:
    # Close the connection
    connection.close()
The result for my sqlite3 file named file_paths.db and database named files is this:
files
[(1, 'C:/BACKUP/3D\\001.blend'), (2, 'C:/BACKUP/3D\\002.blend'), (3, 'C:/BACKUP/3D\\003.blend'), 
(4, 'C:/BACKUP/3D\\default_camera.blend')]
The result is according with the data from sqlite3 table.

Monday, September 9, 2019

Python 3.7.3 : Using the flask - part 018.

In this tutorial, I will show you how to fix auto increment in Flask SQLAlchemy.
The old source code for the user model from the server.py was this:
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password = db.Column(db.String(120), unique=True)
    email = db.Column(db.String(120), unique=True)
    gender = db.Column(db.String(5), unique=True)
    work = db.Column(db.String(33), unique=True)
    city = db.Column(db.String(15), unique=True)
The server.sqlite will be this:
[mythcat@desk my_flask]$ sqlite3 server.sqlite 
SQLite version 3.26.0 2018-12-01 12:34:55
Enter ".help" for usage hints.
sqlite> .schema
CREATE TABLE user (
 id INTEGER NOT NULL, 
 username VARCHAR(80), 
 password VARCHAR(120), 
 email VARCHAR(120), 
 gender VARCHAR(5), 
 work VARCHAR(33), 
 city VARCHAR(15), 
 PRIMARY KEY (id), 
 UNIQUE (username), 
 UNIQUE (password), 
 UNIQUE (email), 
 UNIQUE (gender), 
 UNIQUE (work), 
 UNIQUE (city)
);
CREATE TABLE alembic_version (
 version_num VARCHAR(32) NOT NULL, 
 CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num)
);
CREATE TABLE sqlite_stat1(tbl,idx,stat);
sqlite> ^Z 
If you want to change the id into auto increment then you need to follow this steps:
class User(db.Model):
    __tablename__ = 'user'
    __table_args__ = {'sqlite_autoincrement': True}
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password = db.Column(db.String(120), unique=True)
    email = db.Column(db.String(120), unique=True)
    gender = db.Column(db.String(5), unique=True)
    work = db.Column(db.String(33), unique=True)
    city = db.Column(db.String(15), unique=True)
Delete the server.sqlite file or rename it.
Open python3 and create a new server.sqlite file:
[mythcat@desk my_flask]$ python3
Python 3.7.4 (default, Jul  9 2019, 16:32:37) 
[GCC 9.1.1 20190503 (Red Hat 9.1.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from server import db
>>> db.create_all()
>>> db.engine.table_names()
['sqlite_sequence', 'user']
>>> 
[5]+  Stopat                  python3 
Open the new file to see the changes:
[mythcat@desk my_flask]$ sqlite3 server.sqlite 
SQLite version 3.26.0 2018-12-01 12:34:55
Enter ".help" for usage hints.
sqlite> .schema
CREATE TABLE user (
 id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, 
 username VARCHAR(80), 
 password VARCHAR(120), 
 email VARCHAR(120), 
 gender VARCHAR(5), 
 work VARCHAR(33), 
 city VARCHAR(15), 
 UNIQUE (username), 
 UNIQUE (password), 
 UNIQUE (email), 
 UNIQUE (gender), 
 UNIQUE (work), 
 UNIQUE (city)
);
CREATE TABLE sqlite_sequence(name,seq);
sqlite>  

Tuesday, August 27, 2019

Python 3.7.3 : Using the flask - part 017.

Today I make some changes with my server.py and database and solve some issues, see old version at my old tutorial.
Firt issue was start script.
I create a linux script named start_server.sh to run the flask run command:
[mythcat@desk my_flask]$ ./start_server.sh 
I update the User with new fiels:
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password = db.Column(db.String(120), unique=True)
    email = db.Column(db.String(120), unique=True)
    gender = db.Column(db.String(5), unique=True)
    work = db.Column(db.String(33), unique=True)
    city = db.Column(db.String(15), unique=True)
Let's see how I deal with this versus database and migrate process.
[mythcat@desk my_flask]$ rm server.sqlite 
[mythcat@desk my_flask]$ python3
Python 3.7.4 (default, Jul  9 2019, 16:32:37) 
[GCC 9.1.1 20190503 (Red Hat 9.1.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from server import db
>>> db.create_all()
>>> db.engine.table_names()
['user']
>>> exit()
[mythcat@desk my_flask]$ ls server.sqlite 
server.sqlite
[mythcat@desk my_flask]$ sqlite3 server.sqlite 
SQLite version 3.26.0 2018-12-01 12:34:55
Enter ".help" for usage hints.
sqlite> .tables
user
sqlite> .schema user
CREATE TABLE user (
        id INTEGER NOT NULL, 
        username VARCHAR(80), 
        password VARCHAR(120), 
        email VARCHAR(120), 
        gender VARCHAR(5), 
        work VARCHAR(33), 
        city VARCHAR(15), 
        PRIMARY KEY (id), 
        UNIQUE (username), 
        UNIQUE (password), 
        UNIQUE (email), 
        UNIQUE (gender), 
        UNIQUE (work), 
        UNIQUE (city)
);
I got a strange error:
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: city
I fix it with this , but I'm not sure if the right way:
[mythcat@desk my_flask]$ rm migrations/ -r -f 
[mythcat@desk my_flask]$ python3 server.py db init 
  Creating directory /home/mythcat/project_github/my_flask/migrations ... done
  Creating directory /home/mythcat/project_github/my_flask/migrations/versions ... done
  Generating /home/mythcat/project_github/my_flask/migrations/script.py.mako ... done
  Generating /home/mythcat/project_github/my_flask/migrations/env.py ... done
  Generating /home/mythcat/project_github/my_flask/migrations/alembic.ini ... done
  Generating /home/mythcat/project_github/my_flask/migrations/README ... done
  Please edit configuration/connection/logging settings in '/home/mythcat/project_github/my_flask/migrations/alembic.ini'
  before proceeding.
[mythcat@desk my_flask]$ python3 server.py db migrate
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.env] No changes in schema detected.
[mythcat@desk my_flask]$ python3 server.py db upgrade
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.

Saturday, August 3, 2019

Python 3.7.3 : Using the flask - part 007.

This will be a long tutorial because will try to link some information's from the last tutorials.
First, the structure of the project can see into my GitHub project.
I create new templates and another python script named tserv.py for testing.
You can see easier how the POST method works and how to deal with a python database issue.
This script comes with an upload file feature and upload database with sqlite3 python module.
First, for upload a file we need the HTML5 file from templates folder named upload.html.
I update the base.html file to use the bootstrap framework.
import sqlite3
class UploadForm(FlaskForm):
    file = FileField()
    submit = SubmitField("submit")
    
@app.route('/upload',methods = ['GET','POST'])
def upload():
    form = UploadForm()
    
    if request.method == "POST" and form.validate():
        if form.validate_on_submit():
            file_name = form.file.data
            file_database(name = file_name.filename,data = file_name.read())
            print("File {}".format(file_name.filename))
            return render_template("upload.html", form = form)
    return render_template("upload.html", form = form)
def file_database(name,data):
    con=sqlite3.connect("file_upload.db")
    cursor = con.cursor()
    cursor.execute("""CREATE TABLE IF NOT EXISTS my_table (name TEXT, data BLOP) """)
    cursor.execute("""INSERT INTO my_table (name , data ) VALUES (?,?) """, (name, data))
    con.commit()
    cursor.close()
    con.close()
Let's run it:
python tserv.py
Into the main folder a database will be create and will fill with the file uploaded.
Now, if you want to add more we can create a download button into our UploadForm, is no need to have another python class.
First create your form tag into upload.html file and link with the python tserv.py code show bellow:
from flask import send_file
from io import BytesIO
...
@app.route('/download', methods=['GET','POST'])
def download():
    form = UploadForm()
    if request.method == "POST":
        con = sqlite3.connect("file_upload.db")
        cursor = con.cursor()
        cur_ex = cursor.execute(""" SELECT * FROM my_table """)
        for i in cur_ex.fetchall():
            name=i[0]
            data=i[1]
            break
        con.commit()
        cursor.close()
        con.close()
        return send_file(BytesIO(data), attachment_filename='test', as_attachment=True)
    return render_template("home.html", form = form)
When you run it the download button will download the first file upload into database.
This example tutorial can be more complex.
For example, you can show the database content into a new HTML file and then save with an open dialog to disk.