import { defer } from 'rxjs/observable/defer';
import { Observable } from 'rxjs/Observable';

// Taken from https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d
// Example:
/*import { from } from 'rxjs/observable/from';

from([1, 2, 3])
  .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
  .subscribe(x => console.log(x), null, () => console.log('completed'));*/

export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) => Observable<T> {
  return function inner(source: Observable<T>): Observable<T> {
    return defer(() => {
      onSubscribe();
      return source;
    });
  };
}
