pátek 19. července 2024

BTC graph in terminal :) funny graph updated every second

 This Python script provides a real-time visual display of Bitcoin (BTC) price data using the `curses` library to create a terminal-based interface. Here’s a summary of its functionality:


1. **Price Fetching**: It fetches the latest BTC/USDT price from the Binance API.


2. **Indicators Calculation**:

   - **RSI (Relative Strength Index)**: Calculates and displays the RSI to gauge the price momentum.

   - **Bollinger Bands**: Computes and displays Bollinger Bands, which include the upper, middle, and lower bands, to analyze price volatility.


3. **Visualization**:

   - **Price Plot**: Plots the price changes as a line graph.

   - **Bollinger Bands**: Draws the Bollinger Bands on the graph.

   - **RSI**: Displays the RSI value at the bottom of the screen.


4. **Display Updates**: Continuously updates the graph and indicators every second and shows the current, maximum, and minimum prices at the top-right corner.


5. **Exit**: Allows the user to exit the program by pressing the 'q' key.


The script runs in a terminal environment and uses `curses` to handle real-time screen updates and user interactions.

import requests

import curses

import time

import numpy as np


# Function to get the latest BTC/USDT price from Binance

def get_btcusdt_price():

    url = 'https://api.binance.com/api/v3/ticker/price'

    params = {'symbol': 'BTCUSDT'}

    response = requests.get(url, params=params)

    data = response.json()

    return float(data['price'])


# Function to calculate RSI

def calculate_rsi(prices, period=14):

    if len(prices) < period:

        return 50  # Default RSI value when there's not enough data

    deltas = np.diff(prices)

    seed = deltas[:period]

    up = seed[seed >= 0].sum() / period

    down = -seed[seed < 0].sum() / period

    rs = up / down if down != 0 else 0

    rsi = 100 - (100 / (1 + rs))

    for delta in deltas[period:]:

        if delta > 0:

            upval = delta

            downval = 0

        else:

            upval = 0

            downval = -delta

        up = (up * (period - 1) + upval) / period

        down = (down * (period - 1) + downval) / period

        rs = up / down if down != 0 else 0

        rsi = 100 - (100 / (1 + rs))

    return rsi


# Function to calculate Bollinger Bands

def calculate_bollinger_bands(prices, period=20):

    if len(prices) < period:

        return None, None, None  # Not enough data

    sma = np.mean(prices[-period:])

    std = np.std(prices[-period:])

    upper_band = sma + (2 * std)

    middle_band = sma

    lower_band = sma - (2 * std)

    return upper_band, middle_band, lower_band


# Function to draw the RSI as a bar graph

def draw_rsi(stdscr, rsi, max_y, max_x):

    rsi_str = f'RSI: {rsi:.2f}'

    # Ensure the RSI display stays within screen bounds

    if max_x > len(rsi_str) + 2:

        stdscr.addstr(max_y - 1, max_x - len(rsi_str) - 2, rsi_str, curses.A_BOLD | curses.color_pair(5))


# Function to draw fixed Bollinger Bands with values

def draw_bollinger_bands(stdscr, prices, upper_band, middle_band, lower_band, max_y, min_price, price_range, max_x):

    if upper_band is not None and middle_band is not None and lower_band is not None:

        y_upper = max_y - 1 - int((upper_band - min_price) / price_range * (max_y - 1))

        y_middle = max_y - 1 - int((middle_band - min_price) / price_range * (max_y - 1))

        y_lower = max_y - 1 - int((lower_band - min_price) / price_range * (max_y - 1))


        # Draw upper band

        if 0 <= y_upper < max_y:

            upper_str = f'U: {upper_band:.2f}'

            try:

                stdscr.addstr(y_upper, 0, upper_str, curses.A_BOLD | curses.color_pair(1))

                upper_line = '#' * (max_x - len(upper_str) - 1)

                stdscr.addstr(y_upper, len(upper_str), upper_line, curses.color_pair(1))

            except curses.error:

                pass


        # Draw middle band

        if 0 <= y_middle < max_y:

            middle_str = f'M: {middle_band:.2f}'

            try:

                stdscr.addstr(y_middle, 0, middle_str, curses.A_BOLD | curses.color_pair(2))

                middle_line = '#' * (max_x - len(middle_str) - 1)

                stdscr.addstr(y_middle, len(middle_str), middle_line, curses.color_pair(2))

            except curses.error:

                pass


        # Draw lower band

        if 0 <= y_lower < max_y:

            lower_str = f'L: {lower_band:.2f}'

            try:

                stdscr.addstr(y_lower, 0, lower_str, curses.A_BOLD | curses.color_pair(3))

                lower_line = '#' * (max_x - len(lower_str) - 1)

                stdscr.addstr(y_lower, len(lower_str), lower_line, curses.color_pair(3))

            except curses.error:

                pass


# Function to initialize the screen and start drawing the graph

def draw_graph(stdscr):

    curses.curs_set(0)

    stdscr.nodelay(1)

    stdscr.timeout(1000)


    curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)

    curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK)

    curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK)

    curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK)

    curses.init_pair(5, curses.COLOR_YELLOW, curses.COLOR_BLACK)


    max_y, max_x = stdscr.getmaxyx()

    prices = []

    max_price = 0

    min_price = float('inf')


    while True:

        stdscr.clear()


        price = get_btcusdt_price()

        prices.append(price)


        if len(prices) > max_x - 2:

            prices.pop(0)


        max_price = max(prices)

        min_price = min(prices)


        price_range = max_price - min_price

        if price_range == 0:

            price_range = 1  # Prevent division by zero


        upper_band, middle_band, lower_band = calculate_bollinger_bands(prices)

        rsi = calculate_rsi(prices)


        # Draw Bollinger Bands

        draw_bollinger_bands(stdscr, prices, upper_band, middle_band, lower_band, max_y, min_price, price_range, max_x)

        # Draw RSI

        draw_rsi(stdscr, rsi, max_y, max_x)


        # Draw vertical lines for graph

        for i in range(min(max_x - 1, len(prices) - 1)):

            y1 = max_y - 1 - int((prices[i] - min_price) / price_range * (max_y - 1))

            y2 = max_y - 1 - int((prices[i + 1] - min_price) / price_range * (max_y - 1))

            if 0 <= y1 < max_y and 0 <= y2 < max_y:

                if y1 == y2:

                    stdscr.addch(y1, i, '-')

                elif y1 < y2:

                    for y in range(y1, y2 + 1):

                        stdscr.addch(y, i, '|')

                else:

                    for y in range(y2, y1 + 1):

                        stdscr.addch(y, i, '|')


        # Draw current price, max, and min prices at the top right corner

        current_price_str = f'Price: {price:.2f}'

        max_price_str = f'Max: {max_price:.2f}'

        min_price_str = f'Min: {min_price:.2f}'

        try:

            stdscr.addstr(1, max_x - len(current_price_str) - 2, current_price_str, curses.A_BOLD | curses.color_pair(4))

            stdscr.addstr(2, max_x - len(max_price_str) - 2, max_price_str, curses.A_BOLD | curses.color_pair(4))

            stdscr.addstr(3, max_x - len(min_price_str) - 2, min_price_str, curses.A_BOLD | curses.color_pair(4))

        except curses.error:

            pass


        stdscr.refresh()


        # Exit on 'q'

        if stdscr.getch() == ord('q'):

            break


def main():

    curses.wrapper(draw_graph)


if __name__ == "__main__":

    main()

Python Script for BMW OBD2

Python Script for BMW OBD2

July 19, 2024

can you create python script, which will connect to my bmw x1 2018 via bluetooth to OBD2 and display a simple dashboard, which will constantly display and update all main data from the car (i.e. speed, temperatures, and other service data from car, fuel consumption, etc...)

ChatGPT

Creating a Python script to connect to your BMW X1 2018 via Bluetooth to an OBD2 device and display a dashboard is a multi-step process. This involves:

  1. Setting up the Bluetooth OBD2 Connection:
    • Using a Bluetooth OBD2 adapter to connect to your car.
    • Utilizing a Python library to communicate with the OBD2 adapter.
  2. Retrieving Data:
    • Using the OBD2 protocols to get data such as speed, temperature, fuel consumption, etc.
  3. Displaying the Dashboard:
    • Creating a simple graphical dashboard to display the data, which can be done using libraries such as tkinterfor simplicity or dash for a more web-like interface.

Below is an example script that demonstrates how to achieve this using the obd library for OBD2 communication and tkinter for the dashboard.

Prerequisites

  1. Install the necessary Python libraries:shCopy code

    pip install obd tk


Script

python

Copy code

import obd

import tkinter as tk

from tkinter import StringVar


# Function to update data from OBD2 and refresh the dashboard

def update_dashboard():

    speed_response = connection.query(obd.commands.SPEED)

    rpm_response = connection.query(obd.commands.RPM)

    temp_response = connection.query(obd.commands.COOLANT_TEMP)

    fuel_response = connection.query(obd.commands.FUEL_LEVEL)

    

    if speed_response.value:

        speed_var.set(f"Speed: {speed_response.value.to('mph')}")

    if rpm_response.value:

        rpm_var.set(f"RPM: {rpm_response.value}")

    if temp_response.value:

        temp_var.set(f"Coolant Temp: {temp_response.value.to('celsius')}")

    if fuel_response.value:

        fuel_var.set(f"Fuel Level: {fuel_response.value}%")


    # Schedule the update_dashboard function to be called again after 1000ms

    root.after(1000, update_dashboard)


# Initialize the OBD2 connection

connection = obd.OBD()  # auto-connects to the first available OBD port


# Initialize the GUI

root = tk.Tk()

root.title("Car Dashboard")


speed_var = StringVar()

rpm_var = StringVar()

temp_var = StringVar()

fuel_var = StringVar()


tk.Label(root, textvariable=speed_var, font=("Helvetica", 16)).pack()

tk.Label(root, textvariable=rpm_var, font=("Helvetica", 16)).pack()

tk.Label(root, textvariable=temp_var, font=("Helvetica", 16)).pack()

tk.Label(root, textvariable=fuel_var, font=("Helvetica", 16)).pack()


# Start updating the dashboard

update_dashboard()


# Start the GUI event loop

root.mainloop()

Explanation

  1. Connecting to OBD2:
    • connection = obd.OBD() establishes a connection to the OBD2 adapter. The obd library automatically searches for the Bluetooth OBD2 device.
  2. Updating the Dashboard:
    • The update_dashboard function queries the OBD2 adapter for speed, RPM, coolant temperature, and fuel level using predefined OBD commands.
    • The retrieved data is displayed on the dashboard using tkinter labels. The function is scheduled to run every second to ensure real-time updates.
  3. Creating the GUI:
    • The tkinter library is used to create a simple graphical interface that displays the car's data.
    • StringVar variables are used to hold and update the data displayed in the labels.

Additional Notes

  • Ensure your Bluetooth OBD2 adapter is compatible with your BMW X1 and properly paired with your computer.
  • The obd library might need specific configurations depending on your OBD2 adapter and car model.
  • This script is basic and can be extended to include more data points or a more sophisticated user interface.

This script should give you a starting point for developing a real-time dashboard for your car using Python.


pátek 12. července 2024

Crypto balances retrieval script in Python

Script Documentation: Crypto Balances Retrieval

Script Documentation: Crypto Balances Retrieval

Overview

This script retrieves and calculates the total USD value of crypto balances from Binance, KuCoin, and Coinbase exchanges.

Script Code


import ccxt
from coinbase.wallet.client import Client
import json
import sqlite3
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt

# Load API keys from configuration file
def load_api_keys():
    with open('keys.json') as f:
        keys = json.load(f)
    return keys

# Load API keys configuration
keys = load_api_keys()

# Initialize exchanges and Coinbase client
binance_exchanges = []
for binance_key in keys['binance']:
    exchange = ccxt.binance({
        'apiKey': binance_key['api_key'],
        'secret': binance_key['api_secret'],
    })
    binance_exchanges.append(exchange)

kucoin_exchange = ccxt.kucoin({
    'apiKey': keys['kucoin']['api_key'],
    'secret': keys['kucoin']['api_secret'],
    'password': keys['kucoin']['passphrase'],
})

coinbase_client = Client(keys['coinbase']['api_key'], keys['coinbase']['api_secret'])

# SQLite database initialization
def init_db():
    conn = sqlite3.connect('balances.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS balances
                 (id INTEGER PRIMARY KEY, exchange TEXT, balance_usd REAL, timestamp DATETIME)''')
    conn.commit()
    return conn

conn = init_db()

# Function to retrieve current USD balance from Binance
def get_binance_usd_balance():
    try:
        total_usd_value = 0.0
        for exchange in binance_exchanges:
            balances = exchange.fetch_balance()
            for symbol in balances['total']:
                if balances['total'][symbol] > 0:
                    try:
                        if symbol == 'USDT':
                            usd_value = balances['total'][symbol]
                        else:
                            ticker = exchange.fetch_ticker(f"{symbol}/USDT")
                            if ticker and 'last' in ticker and ticker['last'] is not None:
                                usd_value = balances['total'][symbol] * ticker['last']
                            else:
                                raise ValueError("Ticker data not available")
                        total_usd_value += usd_value
                    except Exception:
                        continue
        return total_usd_value
    except Exception as e:
        print(f"Error fetching balances from Binance: {str(e)}")
        return None

# Function to retrieve current USD balance from KuCoin
def get_kucoin_usd_balance():
    try:
        total_usd_value = 0.0
        balances = kucoin_exchange.fetch_balance()
        for symbol in balances['total']:
            if balances['total'][symbol] > 0:
                try:
                    if symbol == 'USDT':
                        usd_value = balances['total'][symbol]
                    else:
                        ticker = kucoin_exchange.fetch_ticker(f"{symbol}/USDT")
                        if ticker and 'last' in ticker and ticker['last'] is not None:
                            usd_value = balances['total'][symbol] * ticker['last']
                        else:
                            raise ValueError("Ticker data not available")
                    total_usd_value += usd_value
                except Exception:
                    continue
        return total_usd_value
    except Exception as e:
        print(f"Error fetching balances from KuCoin: {str(e)}")
        return None

# Function to retrieve current USD balance from Coinbase
def get_coinbase_usd_balance():
    try:
        total_usd_value = 0.0
        accounts = coinbase_client.get_accounts()
        for account in accounts['data']:
            balance = float(account['balance']['amount'])
            currency = account['balance']['currency']
            if balance > 0:
                try:
                    if currency == 'USD' or currency == 'USDT':
                        usd_value = balance
                    else:
                        ticker = coinbase_client.get_spot_price(currency_pair=f"{currency}-USD")
                        if ticker and ticker['amount'] is not None:
                            usd_value = balance * float(ticker['amount'])
                        else:
                            raise ValueError("Ticker data not available")
                    total_usd_value += usd_value
                except Exception:
                    continue
        return total_usd_value
    except Exception as e:
        print(f"Error fetching balances from Coinbase: {str(e)}")
        return None

# Function to calculate total USD value of all tokens
def get_total_usd_value():
    binance_usd_balance = get_binance_usd_balance() or 0.0
    kucoin_usd_balance = get_kucoin_usd_balance() or 0.0
    coinbase_usd_balance = get_coinbase_usd_balance() or 0.0
    total_usd_value = binance_usd_balance + kucoin_usd_balance + coinbase_usd_balance
    return binance_usd_balance, kucoin_usd_balance, coinbase_usd_balance, total_usd_value

# Function to save balances into the database
def save_balances(binance_usd, kucoin_usd, coinbase_usd, total_usd):
    timestamp = datetime.now()
    conn.execute("INSERT INTO balances (exchange, balance_usd, timestamp) VALUES (?, ?, ?)", 
                 ('Binance', binance_usd, timestamp))
    conn.execute("INSERT INTO balances (exchange, balance_usd, timestamp) VALUES (?, ?, ?)", 
                 ('KuCoin', kucoin_usd, timestamp))
    conn.execute("INSERT INTO balances (exchange, balance_usd, timestamp) VALUES (?, ?, ?)", 
                 ('Coinbase', coinbase_usd, timestamp))
    conn.execute("INSERT INTO balances (exchange, balance_usd, timestamp) VALUES (?, ?, ?)", 
                 ('Total', total_usd, timestamp))
    conn.commit()

# Function to retrieve daily averages from the database
def get_daily_averages():
    c = conn.cursor()
    c.execute("SELECT date(timestamp) as day, AVG(balance_usd) as avg_balance FROM balances WHERE exchange = 'Total' GROUP BY day")
    rows = c.fetchall()
    return rows

# Function to create ASCII daily portfolio graph
def create_ascii_graph(data):
    days = [row[0] for row in data]
    balances = [row[1] for row in data]
    max_balance = max(balances)
    min_balance = min(balances)
    range_balance = max_balance - min_balance
    height = 10
    width = len(days)
    
    for i in range(height, 0, -1):
        line = ""
        for balance in balances:
            if balance >= min_balance + i * range_balance / height:
                line += "█"
            else:
                line += " "
        print(line)
    print("".join(["-" for _ in range(width)]))
    print("".join([str(i % 10) for i in range(width)]))  # X-axis labels (0-9 repeated)

# Run calculations, save to DB, and print total USD value
if __name__ == "__main__":
    binance_usd_balance, kucoin_usd_balance, coinbase_usd_balance, total_usd_value = get_total_usd_value()
    save_balances(binance_usd_balance, kucoin_usd_balance, coinbase_usd_balance, total_usd_value)
    
    print(f"Total USD value on Binance: {binance_usd_balance}")
    print(f"Total USD value on KuCoin: {kucoin_usd_balance}")
    print(f"Total USD value on Coinbase: {coinbase_usd_balance}")
    print(f"\nTotal USD value of all tokens: {total_usd_value}")
    
    daily_averages = get_daily_averages()
    create_ascii_graph(daily_averages)

Example JSON Configuration

{
  "binance": [
    {
      "api_key": "your_binance_api_key_here",
      "api_secret": "your_binance_api_secret_here"
    }
  ],
  "kucoin": {
    "api_key": "your_kucoin_api_key_here",
    "api_secret": "your_kucoin_api_secret_here",
    "passphrase": "your_kucoin_passphrase_here"
  },
  "coinbase": {
    "api_key": "your_coinbase_api_key_here",
    "api_secret": "your_coinbase_api_secret_here"
 

}
}

hledej.me » Tykač

Google
 

počet návštěv blogu za posledních 7 dní

Oblíbené příspěvky

hledej.me