- # coding: utf-8
- from flask import Response
- from datetime import datetime, timedelta
- from flask import Flask
- from flask import session, request
- from flask import render_template, redirect, jsonify
- from flask_sqlalchemy import SQLAlchemy
- from werkzeug.security import gen_salt
- from flask_oauthlib.provider import OAuth2Provider
- import re
- #mysql://username:password@server/db
- app = Flask(__name__, template_folder='templates')
- app.debug = True
- app.secret_key = 'secret'
- app.config.update({
- 'SQLALCHEMY_POOL_RECYCLE':7,
- 'SQLALCHEMY_DATABASE_URI': 'mysql://root:82282578@localhost:3306/amazon',
- })
- db = SQLAlchemy(app)
- oauth = OAuth2Provider(app)
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- pin = db.Column(db.String(255), unique=True)
- class Client(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- client_id = db.Column(db.String(100))
- user_id = db.Column(db.ForeignKey('user.id'))
- user = db.relationship('User')
- state = db.Column(db.Text)
- _redirect_uris = db.Column(db.Text)
- _default_scopes = db.Column(db.Text)
- @property
- def client_type(self):
- return 'public'
- @property
- def redirect_uris(self):
- if self._redirect_uris:
- return self._redirect_uris.split()
- return []
- @property
- def default_redirect_uri(self):
- return self.redirect_uris[0]
- @property
- def default_scopes(self):
- if self._default_scopes:
- return self._default_scopes.split()
- return []
- @property
- def default_scope(self):
- if self.default_scopes:
- return self.default_scopes[0]
- return ''
- class Grant(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- user_id = db.Column(
- db.Integer, db.ForeignKey('user.id', ondelete='CASCADE')
- )
- user = db.relationship('User')
- client_id = db.Column(
- db.String(100), db.ForeignKey('client.client_id'),
- nullable=False,
- )
- client = db.relationship('Client')
- code = db.Column(db.String(255), index=True, nullable=False)
- redirect_uri = db.Column(db.String(255))
- expires = db.Column(db.DateTime)
- _scopes = db.Column(db.Text)
- # state = db.Column(db.String(100))
- def delete(self):
- db.session.delete(self)
- db.session.commit()
- return self
- @property
- def scopes(self):
- if self._scopes:
- return self._scopes.split()
- return []
- class Token(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- client_id = db.Column(
- db.String(100), db.ForeignKey('client.client_id'),
- nullable=False,
- )
- client = db.relationship('Client')
- user_id = db.Column(
- db.Integer, db.ForeignKey('user.id')
- )
- user = db.relationship('User')
- # currently only bearer is supported
- token_type = db.Column(db.String(40))
- access_token = db.Column(db.String(255), unique=True)
- refresh_token = db.Column(db.String(255), unique=True)
- expires = db.Column(db.DateTime)
- _scopes = db.Column(db.Text)
- @property
- def scopes(self):
- if self._scopes:
- return self._scopes.split()
- return []
- class Error:
- def __init__(self, description ,code):
- self.description = description
- self.code = code
- def current_user():
- if 'id' in session:
- uid = session['id']
- return User.query.get(uid)
- return None
- def validateEmail(email):
- if email != None and len(email) > 7:
- email = email.replace(' ','')
- if re.match("^.+\@(\[?)[a-zA-Z0-9\-\.]+\.([a-zA-Z]{2,3}|[0-9]{1,3})(\]?)$", email) != None:
- return True
- return False
- @app.route('/', methods=('GET', 'POST'))
- def home():
- if request.method == 'POST':
- pin = request.form.get('pin')
- client = Client(
- client_id=request.form.get('client_id'),
- _redirect_uris=request.form.get('redirect_uri'),
- _default_scopes=request.form.get('scope'),
- state = request.form.get('state'),
- )
- # 瑼X葫 email �聢撘�
- �聢撘�
- if (validateEmail(email=pin)):
- try:
- user = User.query.filter_by(pin=pin).first()
- except:
- db.session.rollback()
- db.session.close()
- user = User.query.filter_by(pin=pin).first()
- if not user:
- err = Error(code=5, description='Your account is not registered, please use App to register again.')
- return render_template('home.html', error=err,client=client)
- else:
- client.user_id = user.id
- session['id'] = user.id
- return render_template('authorize.html', user=user,client=client)
- else:
- err = Error(code=4, description='Unqualified email format')
- return render_template('home.html', error=err,client=client)
- if request.method == 'GET':
- client = Client(
- client_id=request.args.get('client_id'),
- _redirect_uris=request.args.get('redirect_uri'),
- _default_scopes=request.args.get('scope'),
- state = request.args.get('state'),
- )
- return render_template('home.html',client=client)
- @oauth.clientgetter
- def load_client(client_id):
- return Client.query.filter_by(client_id=client_id).first()
- @oauth.grantgetter
- def load_grant(client_id, code):
- return Grant.query.filter_by(client_id=client_id, code=code).first()
- @oauth.grantsetter
- def save_grant(client_id, code, request, *args, *#grants = Grant.query.filter_by(client_id=request.client.client_id)
- nt.cl#for g in grants:
- g i# g.delete()
- g.# decide the expires time yourself
- time yourself
- expires = datetime.utcnow() + timedelta(seconds=100)
- grant = Grant(
- client_id=client_id,
- code=code['code'],
- redirect_uri=request.redirect_uri,
- _scopes=' '.join(request.sco# state = request.state,
- equest.state,
- user=current_user(),
- expires=expires
- )
- db.session.add(grant)
- db.session.commit()
- return grant
- @oauth.tokengetter
- def load_token(access_token=None, refresh_token=None):
- if access_token:
- return Token.query.filter_by(access_token=access_token).first()
- elif refresh_token:
- return Token.query.filter_by(refresh_token=refresh_token).first()
- @oauth.tokensetter
- def save_token(token, request, *args, **kwargs):
- toks = Token.query.filter_by(
- client_id=request.client.client_id,
- user_id=request.user# make sure that every client has only one token connected to a user
- ted to a user
- for t in toks:
- db.session.delete(t)
- expires_in = token.pop('expires_in')
- expires = datetime.utcnow() + timedelta(seconds=expires_in)
- tok = Token(
- access_token=token['access_token'],
- refresh_token=token['],
- token_type=token['n_type=token['token_type'],
- _scopes=token['scope'],
- expires=expires,
- client_id=request.client.client_id,
- user_id=request.user.i#user = User.query.filter_by(id=tok.user_id).first()
- r_id#tokenLog = open(r'/var/www/oauth2/token.log','a')
- ken.#tokenLog.write('save_token user_pin:%s access_token:%s refresh token:%s \n' % (user.pin,tok.access_token,tok.refresh_token))
- fres#tokenLog.close()
- enLog.close()
- db.session.add(tok)
- db.session.commit()
- return tok
- @app.route('/oauth/token', methods=['GET', 'POST'])
- @oauth.token_handler
- def access_token():
- return None
- @app.route('/oauth/authorize', methods=['GET','POST'])
- @oauth.authorize_handler
- def authorize(*args, **kwargs):
- if request.method == 'POST':
- user_id = request.form.get('user_id')
- client = Client(
- client_id=request.form.get('client_id'),
- _redirect_uris=request.form.get('redirect_uri'),
- _default_scopes=request.form.get('scope'),
- state = request.form.get('state')
- )
- if request.form.get('yes'irm') == 'yes':
- client.user_id = user_id
- dbClient = Client.query.filter_by(client_id=client.client_id).first()
- if not dbClient:
- db.session.add(client)
- else:
- dbClient.user_id = user_id
- dbClient._redirect_uris = request.form.get('redirect_uri')
- dbClient._default_scopes = request.form.get('scope')
- dbClient.state = request.form.get('state')
- db.session.commit()
- db.session.close()
- confirm = request.form.get('confirm','no')
- return confirm == 'yes'
- else:
- err = Error(code=3, description='Authorization failed.')
- return render_template('home.html', error=err,client=client)
- @app.route('/echo/privacy-policy')
- def privacy():
- return render_template('privacy.html')
- @app.route('/dnvalidation/')
- def dnvalidation():
- return render_template('DN_CHECK_FILE.htm')
- @app.route('/echo/faq')
- def faq():
- return render_template('amazonFAQ.html')
- @app.route('/api/me')
- @oauth.require_oauth()
- def me():
- user = request.oauth.user
- return jsonify(pin=user.pin)
- @app.teardown_appcontext
- def shutdown_session(exception=None):
- db.session.remove()
- if exception and db.session.is_active:
- db.session.rollback()
- if __name__ == '__main__':
- app.run(port=443,host='amazon.ioshop.co
Raw Paste