In this tutorial you will learn about implementing a web push notification service with python and flask. You can directly clone the webpush-python-flask repository or follow a step by step guide to implement it.

Table of Content

If you are implementing a push service from scratch or integrating it in your existing application, this blog post will help you to reach your goal. There will be following sections in this post:

How web push works in Python

On high level web push needs three parties/component to work. Those are:

  • Client side application: Get users permissions, get users subscription token and sends to the backend service.
  • Push Service: Validates push request coming from backend service and forward the push message to the appropriate browser.
  • Backend service: Persists users subscription information and initiate push sending.

Steps to send/receive push web push notification in Python

  • User accepts push permission and browser generate push subscription token via communicating with the Push API
  • Client app should send the subscription information to the backend service and backend service should be persisting the subscription information and use it to the next steps
  • Backend push service initiate the push and send the payload to the specific push service (which is denoted in the users subscription information)
  • Push service receives the push notification and forward it the specific user and browser display the notification

Rest APIs for communication between client and push service

We will be building a REST interface that will communicate with the client application and push service. It will store the subscription information of users and distribute VAPID  public key.

VAPID is the short term for Voluntary Application Server Identification, the generated public key will be used via the client app.  We will need to develop the following API endpoints:

  • GET /subscription/: to get vapid public key
  • POST /subscription/: to store subscription information
  • POST /push/: will send push request to all users ( will be used for testing )

Let's Setup the project

You can refer to the the source code

Your directory structure should look like this

-- static 
----- images 
------- badge.png 
------- icon.png 
----- index.css 
----- main.js 
----- sw.js 
-- templates 
----- index.html 
-- main.py

Open up your terminal in the project directory and then proceed further.

For this tutorial, you will be working with Python3 and the virtual environment. You can check this guide on installing python3 if you don't have it installed or learn about the virtual environment.

First, create and activate a virtual enviroment. Open your terminal at the project directory.

virtualenv -p python3 venv  
source venv/bin/activate

Now install the following requirements

#req.txt can be found from the above repository link 
pip install -r req.txt

Generate the VAPIDs via following command:

openssl ecparam -name prime256v1 -genkey -noout -out vapid_private.pem

Create base64 encoded DER representation of the keys

openssl ec -in ./vapid_private.pem -outform DER|tail -c +8|head -c 32|base64|tr -d '=' |tr '/+' '_-' >> private_key.txt
openssl ec -in ./vapid_private.pem -pubout -outform DER|tail -c 65|base64|tr -d '=' |tr '/+' '_-' >> public_key.txt

These VAPIDs keys will be used in the newly developed backend service. We will be using pywebpush library for sending the web push notification. We will be wrapping the push like below by using newly generated keys:

Now lets create a flask app to make use of this function send_web_push

we will create these files, follow the above directory structure:

  • main.py
  • main.js
  • sw.js
  • index.html

main.py

import logging
import json, os

from flask import request, Response, render_template, jsonify, Flask
from pywebpush import webpush, WebPushException

app = Flask(__name__)
app.config['SECRET_KEY'] = '9OLWxND4o83j4K4iuopO'

DER_BASE64_ENCODED_PRIVATE_KEY_FILE_PATH = os.path.join(os.getcwd(),"private_key.txt")
DER_BASE64_ENCODED_PUBLIC_KEY_FILE_PATH = os.path.join(os.getcwd(),"public_key.txt")

VAPID_PRIVATE_KEY = open(DER_BASE64_ENCODED_PRIVATE_KEY_FILE_PATH, "r+").readline().strip("\n")
VAPID_PUBLIC_KEY = open(DER_BASE64_ENCODED_PUBLIC_KEY_FILE_PATH, "r+").read().strip("\n")

VAPID_CLAIMS = {
"sub": "mailto:[email protected]"
}

def send_web_push(subscription_information, message_body):
    return webpush(
        subscription_info=subscription_information,
        data=message_body,
        vapid_private_key=VAPID_PRIVATE_KEY,
        vapid_claims=VAPID_CLAIMS
    )

@app.route('/')
def index():
    return render_template('index.html')

@app.route("/subscription/", methods=["GET", "POST"])
def subscription():
    """
        POST creates a subscription
        GET returns vapid public key which clients uses to send around push notification
    """

    if request.method == "GET":
        return Response(response=json.dumps({"public_key": VAPID_PUBLIC_KEY}),
            headers={"Access-Control-Allow-Origin": "*"}, content_type="application/json")

    subscription_token = request.get_json("subscription_token")
    return Response(status=201, mimetype="application/json")

@app.route("/push_v1/",methods=['POST'])
def push_v1():
    message = "Push Test v1"
    print("is_json",request.is_json)

    if not request.json or not request.json.get('sub_token'):
        return jsonify({'failed':1})

    print("request.json",request.json)

    token = request.json.get('sub_token')
    try:
        token = json.loads(token)
        send_web_push(token, message)
        return jsonify({'success':1})
    except Exception as e:
        print("error",e)
        return jsonify({'failed':str(e)})

if __name__ == "__main__":
    app.run(host="0.0.0.0",port=8080)

static/main.js

'use strict';

// const applicationServerPublicKey = "BNbxGYNMhEIi9zrneh7mqV4oUanjLUK3m+mYZBc62frMKrEoMk88r3Lk596T0ck9xlT+aok0fO1KXBLV4+XqxYM=";
const pushButton = document.querySelector('.js-push-btn');

let isSubscribed = false;
let swRegistration = null;

function urlB64ToUint8Array(base64String) {
    const padding = '='.repeat((4 - base64String.length % 4) % 4);
    const base64 = (base64String + padding)
        .replace(/\-/g, '+')
        .replace(/_/g, '/');

    const rawData = window.atob(base64);
    const outputArray = new Uint8Array(rawData.length);

    for (let i = 0; i < rawData.length; ++i) {
        outputArray[i] = rawData.charCodeAt(i);
    }
    return outputArray;
}

function updateBtn() {
    if (Notification.permission === 'denied') {
        pushButton.textContent = 'Push Messaging Blocked.';
        pushButton.disabled = true;
        updateSubscriptionOnServer(null);
        return;
    }

    if (isSubscribed) {
        pushButton.textContent = 'Disable Push Messaging';
    } else {
        pushButton.textContent = 'Enable Push Messaging';
    }

    pushButton.disabled = false;
}

function updateSubscriptionOnServer(subscription) {
    // TODO: Send subscription to application server

    const subscriptionJson = document.querySelector('.js-subscription-json');
    const subscriptionDetails =
        document.querySelector('.js-subscription-details');

    if (subscription) {
        subscriptionJson.textContent = JSON.stringify(subscription);
        subscriptionDetails.classList.remove('is-invisible');
    } else {
        subscriptionDetails.classList.add('is-invisible');
    }
}

function subscribeUser() {
    const applicationServerPublicKey = localStorage.getItem('applicationServerPublicKey');
    const applicationServerKey = urlB64ToUint8Array(applicationServerPublicKey);
    swRegistration.pushManager.subscribe({
            userVisibleOnly: true,
            applicationServerKey: applicationServerKey
        })
        .then(function(subscription) {
            console.log('User is subscribed.');

            updateSubscriptionOnServer(subscription);
            localStorage.setItem('sub_token',JSON.stringify(subscription));
            isSubscribed = true;

            updateBtn();
        })
        .catch(function(err) {
            console.log('Failed to subscribe the user: ', err);
            updateBtn();
        });
}

function unsubscribeUser() {
    swRegistration.pushManager.getSubscription()
        .then(function(subscription) {
            if (subscription) {
                return subscription.unsubscribe();
            }
        })
        .catch(function(error) {
            console.log('Error unsubscribing', error);
        })
        .then(function() {
            updateSubscriptionOnServer(null);

            console.log('User is unsubscribed.');
            isSubscribed = false;

            updateBtn();
        });
}

function initializeUI() {
    pushButton.addEventListener('click', function() {
        pushButton.disabled = true;
        if (isSubscribed) {
            unsubscribeUser();
        } else {
            subscribeUser();
        }
    });

    // Set the initial subscription value
    swRegistration.pushManager.getSubscription()
        .then(function(subscription) {
            isSubscribed = !(subscription === null);

            updateSubscriptionOnServer(subscription);

            if (isSubscribed) {
                console.log('User IS subscribed.');
            } else {
                console.log('User is NOT subscribed.');
            }

            updateBtn();
        });
}

if ('serviceWorker' in navigator && 'PushManager' in window) {
    console.log('Service Worker and Push is supported');

    navigator.serviceWorker.register("/static/sw.js")
        .then(function(swReg) {
            console.log('Service Worker is registered', swReg);

            swRegistration = swReg;
            initializeUI();
        })
        .catch(function(error) {
            console.error('Service Worker Error', error);
        });
} else {
    console.warn('Push meapplicationServerPublicKeyssaging is not supported');
    pushButton.textContent = 'Push Not Supported';
}

function push_message() {
    console.log("sub_token", localStorage.getItem('sub_token'));
    $.ajax({
        type: "POST",
        url: "/push_v1/",
        contentType: 'application/json; charset=utf-8',
        dataType:'json',
        data: JSON.stringify({'sub_token':localStorage.getItem('sub_token')}),
        success: function( data ){
            console.log("success",data);
    },
    error: function( jqXhr, textStatus, errorThrown ){
        console.log("error",errorThrown);
    }
    });
}

$(document).ready(function(){
    $.ajax({
        type:"GET",
        url:'/subscription/',
        success:function(response){
            console.log("response",response);
            localStorage.setItem('applicationServerPublicKey',response.public_key);
        }
    })
});

static/sw.js

'use strict';

/* eslint-disable max-len */

// const applicationServerPublicKey = "BNbxGYNMhEIi9zrneh7mqV4oUanjLUK3m+mYZBc62frMKrEoMk88r3Lk596T0ck9xlT+aok0fO1KXBLV4+XqxYM=";

/* eslint-enable max-len */

function urlB64ToUint8Array(base64String) {
  const padding = '='.repeat((4 - base64String.length % 4) % 4);
  const base64 = (base64String + padding)
    .replace(/\-/g, '+')
    .replace(/_/g, '/');

  const rawData = window.atob(base64);
  const outputArray = new Uint8Array(rawData.length);

  for (let i = 0; i < rawData.length; ++i) {
    outputArray[i] = rawData.charCodeAt(i);
  }
  return outputArray;
}

self.addEventListener('push', function(event) {
  console.log('[Service Worker] Push Received.');
  console.log(`[Service Worker] Push had this data: "${event.data.text()}"`);

  const title = 'Push Codelab';
  const options = {
    body: `"${event.data.text()}"`,
    icon: 'images/icon.png',
    badge: 'images/badge.png'
  };

  event.waitUntil(self.registration.showNotification(title, options));
});

self.addEventListener('notificationclick', function(event) {
  console.log('[Service Worker] Notification click Received.');

  event.notification.close();

  event.waitUntil(
    clients.openWindow('https://developers.google.com/web/')
  );
});

self.addEventListener('pushsubscriptionchange', function(event) {
  console.log('[Service Worker]: \'pushsubscriptionchange\' event fired.');
  const applicationServerPublicKey = localStorage.getItem('applicationServerPublicKey');
  const applicationServerKey = urlB64ToUint8Array(applicationServerPublicKey);
  event.waitUntil(
    self.registration.pushManager.subscribe({
      userVisibleOnly: true,
      applicationServerKey: applicationServerKey
    })
    .then(function(newSubscription) {
      // TODO: Send to application server
      console.log('[Service Worker] New subscription: ', newSubscription);
    })
  );
});

templates/index.html

<!DOCTYPE html>
<html>

<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <title>Push Notification | Raturi</title>

    <link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
    <link rel="stylesheet" href="https://code.getmdl.io/1.2.1/material.indigo-pink.min.css">
    <script defer src="https://code.getmdl.io/1.2.1/material.min.js"></script>
    <script src="https://code.jquery.com/jquery-3.4.1.min.js" integrity="sha256-CSXorXvZcTkaix6Yvo6HppcZGetbYMGWSFlBw8HfCJo=" crossorigin="anonymous"></script>
    <link rel="stylesheet" href="{{ url_for('static', filename='index.css') }}">
</head>

<body>

    <header>
        <h1>WebPush Notification</h1>
    </header>

    <main>
        <p>Welcome to the webpush notification. The button below needs to be
            fixed to support subscribing to push.</p>
        <p>
            <button disabled class="js-push-btn mdl-button mdl-js-button mdl-button--raised mdl-js-ripple-effect">
                Enable Push Messaging
            </button>
        </p>
        <section class="subscription-details js-subscription-details is-invisible">
            <p>Once you've subscribed your user, you'd send their subscription to your
                server to store in a database so that when you want to send a message
                you can lookup the subscription and send a message to it.</p>
            <pre><code class="js-subscription-json"></code></pre>

            <hr>
            <p>You can test push notification below.</p>
            <button type="submit" class="mdl-button mdl-js-button mdl-button--raised mdl-button--colored" onclick="push_message()">Test Push Notification</button>
        </section>
    </main>

    <script src="{{ url_for('static',filename='main.js') }}"></script>
    <script src="https://code.getmdl.io/1.2.1/material.min.js"></script>
</body>



</html>

Everythings is setup by now, just make sure you have followed the same directory structure for your files too. or you can have a look at the above repository.

Now run the flask server:

#you can change the port inside main.py python main.py
Visit your site but not in incognito
http://localhost:8080/

you will see something like this

webpus1-min

webpush2-min

webpush3-min

click on ENABLE PUSH MESSAGING, and allow the notification from the dialog box which appears after clicking. if no dialog box appears, click on i icon on the left side of your address bar where url is entered and allow the notifcations. At the bottom, there is Test Push Notification Button, click on that and  you will get a push notification in your browser. Thats all.

You can also check out more python and django tutorials.